Skip to content

Conversation

@mosche
Copy link
Contributor

@mosche mosche commented Mar 17, 2025

Reproduction of scaling EsExecutors bug #124667 to work with max pool size > 1.
However, this doesn't reproduce well and requires lots of iterations (tests.iters) to catch the bug for max pool size > 1 (and hard ever reproduces if using keep alive). Open for suggestions / ideas, but leaning towards considering this to be sufficient enough.

Note, a seemingly obvious approach would be to block (max - 1) threads in the pool. This will cause work to starve in a very similar way to the case core=0/max=1. However, there’s a significant difference, despite the pool having capacity for a spare worker, the work is rather starved by blocked workers and not by the fact that no worker is running at all. As soon as any of the workers is unblocked, the pool will continue processing the task queue. The fact that the pool is queueing despite having spare capacity isn’t optimal either, but I’m considering this to be a separate issue.

Relates to #124867, ES-10640

@mosche mosche added >test Issues or PRs that are addressing/adding tests :Core/Infra/Core Core issues without another label labels Mar 17, 2025
@elasticsearchmachine elasticsearchmachine added Team:Core/Infra Meta label for core/infra team v9.1.0 labels Mar 17, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-core-infra (Team:Core/Infra)

@mosche mosche requested a review from a team March 18, 2025 07:43
@mosche
Copy link
Contributor Author

mosche commented Mar 21, 2025

Btw, here's another reproduction attempt I tried to use initially. This repeatedly fills the work pool with work and let's it complete at the same time using a CyclicBarrier. My expectation was this would lead to the starvation issue for both core size = 0 and core size > 0. However, it looks like we're not running into the issue if multiple threads expire at around the same time.

private void testScalingWithEmptyCore(EsThreadPoolExecutor esExecutor) {
        class Scheduler implements AutoCloseable {
            final ExecutorService scheduler = Executors.newSingleThreadExecutor();
            final long keepAliveNanos = esExecutor.getKeepAliveTime(TimeUnit.NANOSECONDS);
            final int maximumPoolSize = esExecutor.getMaximumPoolSize();
            final boolean isEsScalingQueue = esExecutor.getQueue() instanceof EsExecutors.ExecutorScalingQueue<?>;

            final Semaphore success = new Semaphore(0);
            final CyclicBarrier barrier = new CyclicBarrier(maximumPoolSize + 1);

            volatile int remaining = resetRemaining();

            private int resetRemaining() {
                return between(10, 200);
            }

            final Runnable work = new AbstractRunnable() {
                @Override
                public void onFailure(Exception e) {
                    fail(e);
                }

                @Override
                protected void doRun() throws Exception {
                    barrier.await(); // wait for all work + testPool to be ready to proceed
                }
            };

            final Runnable continuation = () -> {
                if (remaining > 0) {
                    remaining--;
                    testPoolAsync();
                } else {
                    remaining = resetRemaining(); // reset for next round
                    success.release();
                }
            };

            final Runnable testPool = new AbstractRunnable() {
                @Override
                public void onFailure(Exception e) {
                    fail(e);
                }

                @Override
                protected void doRun() throws Exception {
                    for (int count = 0; count < maximumPoolSize;) {
                        esExecutor.execute(work);
                        if (isEsScalingQueue && removeQueuedWork()) {
                            Thread.yield(); // yield and try again
                        } else {
                            count++;
                        }
                    }
                    barrier.await(); // wait for all work to be running
                    if (keepAliveNanos > 0) {
                        var targetNanoTime = System.nanoTime() + keepAliveNanos + between(-1_000, 1_000);
                        while (System.nanoTime() < targetNanoTime) {
                            Thread.yield();
                        }
                    }
                    esExecutor.execute(continuation);
                }

                // remove work that is queued due to ExecutorScalingQueue so we can be sure all work is running
                private boolean removeQueuedWork() {
                    boolean workWasQueued = false;
                    Runnable queuedWork;
                    while ((queuedWork = ThreadContext.unwrap(esExecutor.getQueue().poll())) != null) {
                        logger.trace(
                            "{} was queued [poolSize={}, maximumPoolSize={}, activeCount={}, remaining={}]",
                            queuedWork == work ? "WORK" : "OTHER", // could be EsThreadPoolExecutor.WORKER_PROBE
                            esExecutor.getPoolSize(),
                            maximumPoolSize,
                            esExecutor.getActiveCount(),
                            remaining
                        );
                        workWasQueued |= queuedWork == work;
                    }
                    return workWasQueued;
                }
            };

            public void testPoolAsync() {
                scheduler.execute(testPool);
            }

            @Override
            public void close() {
                success.release();
                scheduler.shutdownNow();
            }
        }

        try (var scheduler = new Scheduler()) {
            for (int i = 0; i < 5000; i++) {
                scheduler.testPoolAsync();
                safeAcquire(scheduler.success);

            }
        } finally {
            ThreadPool.terminate(esExecutor, 1, TimeUnit.SECONDS);
        }
    }

@mosche mosche marked this pull request as draft March 21, 2025 13:50
Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a problem with this being hard-to-reproduce - in practice we run thousands of iterations of these things every day, we will notice a bug eventually.

@mosche
Copy link
Contributor Author

mosche commented Mar 28, 2025

Thanks for the feedback @DaveCTurner ... was a bit distracted this week with on week, but will wrap this up mid next week 👍

@mosche mosche marked this pull request as ready for review April 3, 2025 09:29
@mosche
Copy link
Contributor Author

mosche commented Apr 4, 2025

@DaveCTurner I've addressed your feedback

Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM (couple of nits)

@mosche mosche added the auto-merge-without-approval Automatically merge pull request when CI checks pass (NB doesn't wait for reviews!) label Apr 7, 2025
@mosche mosche changed the title Improved reproduction of scaling EsExecutors bug #124667 to work with max pool size > 0. Improved reproduction of scaling EsExecutors bug #124667 to work with max pool size > 1. Apr 7, 2025
@mosche mosche merged commit 0360db2 into elastic:main Apr 7, 2025
16 of 17 checks passed
@mosche mosche deleted the ktlo/esExecutorBug_reproduction branch April 7, 2025 13:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

auto-merge-without-approval Automatically merge pull request when CI checks pass (NB doesn't wait for reviews!) :Core/Infra/Core Core issues without another label Team:Core/Infra Meta label for core/infra team >test Issues or PRs that are addressing/adding tests v9.1.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants