1111
1212import org .elasticsearch .action .ActionListener ;
1313import org .elasticsearch .action .ActionRunnable ;
14+ import org .elasticsearch .action .support .SubscribableListener ;
15+ import org .elasticsearch .common .Strings ;
1416import org .elasticsearch .common .settings .Setting ;
1517import org .elasticsearch .common .settings .Settings ;
1618import org .elasticsearch .common .unit .Processors ;
2527import java .util .concurrent .CyclicBarrier ;
2628import java .util .concurrent .Executor ;
2729import java .util .concurrent .ExecutorService ;
30+ import java .util .concurrent .Executors ;
31+ import java .util .concurrent .Semaphore ;
2832import java .util .concurrent .ThreadFactory ;
2933import java .util .concurrent .ThreadPoolExecutor ;
3034import java .util .concurrent .TimeUnit ;
35+ import java .util .concurrent .TimeoutException ;
3136import java .util .concurrent .atomic .AtomicBoolean ;
3237
3338import static org .elasticsearch .common .util .concurrent .EsExecutors .TaskTrackingConfig .DEFAULT ;
@@ -295,6 +300,7 @@ public void run() {
295300 }
296301 try {
297302 executor .execute (new Runnable () {
303+
298304 @ Override
299305 public void run () {
300306 // Doesn't matter is going to be rejected
@@ -757,7 +763,7 @@ public void onRejection(Exception e) {
757763 }
758764
759765 public void testScalingWithEmptyCore () {
760- testScalingWithEmptyCore (
766+ testScalingWithEmptyCoreAndMaxSingleThread (
761767 EsExecutors .newScaling (
762768 getTestName (),
763769 0 ,
@@ -772,7 +778,7 @@ public void testScalingWithEmptyCore() {
772778 }
773779
774780 public void testScalingWithEmptyCoreAndKeepAlive () {
775- testScalingWithEmptyCore (
781+ testScalingWithEmptyCoreAndMaxSingleThread (
776782 EsExecutors .newScaling (
777783 getTestName (),
778784 0 ,
@@ -787,9 +793,7 @@ public void testScalingWithEmptyCoreAndKeepAlive() {
787793 }
788794
789795 public void testScalingWithEmptyCoreAndLargerMaxSize () {
790- // TODO currently the reproduction of the starvation bug does not work if max pool size > 1
791- // https://github.com/elastic/elasticsearch/issues/124867
792- testScalingWithEmptyCore (
796+ testScalingWithEmptyCoreAndMaxMultipleThreads (
793797 EsExecutors .newScaling (
794798 getTestName (),
795799 0 ,
@@ -804,9 +808,7 @@ public void testScalingWithEmptyCoreAndLargerMaxSize() {
804808 }
805809
806810 public void testScalingWithEmptyCoreAndKeepAliveAndLargerMaxSize () {
807- // TODO currently the reproduction of the starvation bug does not work if max pool size > 1
808- // https://github.com/elastic/elasticsearch/issues/124867
809- testScalingWithEmptyCore (
811+ testScalingWithEmptyCoreAndMaxMultipleThreads (
810812 EsExecutors .newScaling (
811813 getTestName (),
812814 0 ,
@@ -821,10 +823,8 @@ public void testScalingWithEmptyCoreAndKeepAliveAndLargerMaxSize() {
821823 }
822824
823825 public void testScalingWithEmptyCoreAndWorkerPoolProbing () {
824- // https://github.com/elastic/elasticsearch/issues/124667 is difficult to reproduce if max pool size > 1.
825- // if probing mitigates the bug for max pool size = 1, we're good for larger pool sizes as well.
826826 // the executor is created directly here, newScaling doesn't use ExecutorScalingQueue & probing if max pool size = 1.
827- testScalingWithEmptyCore (
827+ testScalingWithEmptyCoreAndMaxSingleThread (
828828 new EsThreadPoolExecutor (
829829 getTestName (),
830830 0 ,
@@ -840,10 +840,8 @@ public void testScalingWithEmptyCoreAndWorkerPoolProbing() {
840840 }
841841
842842 public void testScalingWithEmptyCoreAndKeepAliveAndWorkerPoolProbing () {
843- // https://github.com/elastic/elasticsearch/issues/124667 is difficult to reproduce if max pool size > 1.
844- // if probing mitigates the bug for max pool size = 1, we're good for larger pool sizes as well.
845843 // the executor is created directly here, newScaling doesn't use ExecutorScalingQueue & probing if max pool size = 1.
846- testScalingWithEmptyCore (
844+ testScalingWithEmptyCoreAndMaxSingleThread (
847845 new EsThreadPoolExecutor (
848846 getTestName (),
849847 0 ,
@@ -858,11 +856,13 @@ public void testScalingWithEmptyCoreAndKeepAliveAndWorkerPoolProbing() {
858856 );
859857 }
860858
861- private void testScalingWithEmptyCore (EsThreadPoolExecutor executor ) {
859+ private void testScalingWithEmptyCoreAndMaxSingleThread (EsThreadPoolExecutor testSubject ) {
862860 try {
861+ final var keepAliveNanos = testSubject .getKeepAliveTime (TimeUnit .NANOSECONDS );
862+
863863 class Task extends AbstractRunnable {
864- private int remaining ;
865864 private final CountDownLatch doneLatch ;
865+ private int remaining ;
866866
867867 Task (int iterations , CountDownLatch doneLatch ) {
868868 this .remaining = iterations ;
@@ -879,29 +879,108 @@ protected void doRun() {
879879 if (--remaining == 0 ) {
880880 doneLatch .countDown ();
881881 } else {
882- logger .trace ("--> remaining [{}]" , remaining );
883- final long keepAliveNanos = executor .getKeepAliveTime (TimeUnit .NANOSECONDS );
884882 new Thread (() -> {
885883 if (keepAliveNanos > 0 ) {
886- final var targetNanoTime = System .nanoTime () + keepAliveNanos + between (-10_000 , 10_000 );
887- while (System .nanoTime () < targetNanoTime ) {
888- Thread .yield ();
889- }
884+ waitUntilKeepAliveTime (keepAliveNanos );
890885 }
891- executor .execute (Task .this );
886+ testSubject .execute (Task .this );
892887 }).start ();
893888 }
894889 }
895890 }
896891
897892 for (int i = 0 ; i < 20 ; i ++) {
898- logger .trace ("--> attempt [{}]" , i );
899893 final var doneLatch = new CountDownLatch (1 );
900- executor .execute (new Task (between (1 , 500 ), doneLatch ));
894+ testSubject .execute (new Task (between (1 , 500 ), doneLatch ));
901895 safeAwait (doneLatch , TimeValue .ONE_MINUTE );
902896 }
903897 } finally {
904- ThreadPool .terminate (executor , 1 , TimeUnit .SECONDS );
898+ ThreadPool .terminate (testSubject , 1 , TimeUnit .SECONDS );
899+ }
900+ }
901+
902+ private void testScalingWithEmptyCoreAndMaxMultipleThreads (EsThreadPoolExecutor testSubject ) {
903+ final var keepAliveNanos = testSubject .getKeepAliveTime (TimeUnit .NANOSECONDS );
904+ // Use max pool size with one additional scheduler task if a keep alive time is set.
905+ final var schedulerTasks = testSubject .getMaximumPoolSize () + (keepAliveNanos > 0 ? 1 : 0 );
906+
907+ class TaskScheduler {
908+ final SubscribableListener <Void > result = new SubscribableListener <>();
909+ final ExecutorService scheduler ;
910+ final CyclicBarrier cyclicBarrier ;
911+ final Semaphore taskCompletions ;
912+ private int remaining ;
913+
914+ TaskScheduler (ExecutorService scheduler , int iterations ) {
915+ this .scheduler = scheduler ;
916+ this .taskCompletions = new Semaphore (0 );
917+ this .cyclicBarrier = new CyclicBarrier (schedulerTasks , () -> remaining --);
918+ this .remaining = iterations ;
919+ }
920+
921+ public void start () {
922+ // The scheduler tasks are running on the dedicated scheduler thread pool. Each task submits
923+ // a test task on the EsThreadPoolExecutor (`testSubject`) releasing one `taskCompletions` permit.
924+ final Runnable schedulerTask = () -> {
925+ try {
926+ while (remaining > 0 ) {
927+ // Wait for all scheduler threads to be ready for the next attempt.
928+ var first = cyclicBarrier .await (SAFE_AWAIT_TIMEOUT .millis (), TimeUnit .MILLISECONDS ) == schedulerTasks - 1 ;
929+ if (first && keepAliveNanos > 0 ) {
930+ // The task submitted by the first scheduler task (after reaching the keep alive time) is the task
931+ // that might starve without any worker available unless an additional worker probe is submitted.
932+ waitUntilKeepAliveTime (keepAliveNanos );
933+ }
934+ // Test EsThreadPoolExecutor by submitting a task that releases one permit.
935+ testSubject .execute (taskCompletions ::release );
936+ if (first ) {
937+ // Let the first scheduler task (by arrival on the barrier) wait for all permits.
938+ var success = taskCompletions .tryAcquire (
939+ schedulerTasks ,
940+ SAFE_AWAIT_TIMEOUT .millis (),
941+ TimeUnit .MILLISECONDS
942+ );
943+ if (success == false ) {
944+ var msg = Strings .format (
945+ "timed out waiting for [%s] of [%s] tasks to complete [queue size: %s, workers: %s] " ,
946+ schedulerTasks - taskCompletions .availablePermits (),
947+ schedulerTasks ,
948+ testSubject .getQueue ().size (),
949+ testSubject .getPoolSize ()
950+ );
951+ result .onFailure (new TimeoutException (msg ));
952+ return ;
953+ }
954+ }
955+ }
956+ } catch (Exception e ) {
957+ result .onFailure (e );
958+ return ;
959+ }
960+ result .onResponse (null );
961+ };
962+ // Run scheduler tasks on the dedicated scheduler thread pool.
963+ for (int i = 0 ; i < schedulerTasks ; i ++) {
964+ scheduler .execute (schedulerTask );
965+ }
966+ }
967+ }
968+
969+ try (var scheduler = Executors .newFixedThreadPool (schedulerTasks )) {
970+ for (int i = 0 ; i < 100 ; i ++) {
971+ TaskScheduler taskScheduler = new TaskScheduler (scheduler , between (10 , 200 ));
972+ taskScheduler .start ();
973+ safeAwait (taskScheduler .result );
974+ }
975+ } finally {
976+ ThreadPool .terminate (testSubject , 1 , TimeUnit .SECONDS );
977+ }
978+ }
979+
980+ private void waitUntilKeepAliveTime (long keepAliveNanos ) {
981+ var targetNanoTime = System .nanoTime () + keepAliveNanos + between (-1_000 , 1_000 );
982+ while (System .nanoTime () < targetNanoTime ) {
983+ Thread .yield ();
905984 }
906985 }
907986}
0 commit comments