-
Notifications
You must be signed in to change notification settings - Fork 25.7k
[ML] Ensure queued AbstractRunnables are notified when executor stops #135966
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
5f5eded
eb9f19b
83fefe2
2767061
4e09877
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| pr: 135966 | ||
| summary: Ensure queued `AbstractRunnables` are notified when executor stops | ||
| area: Machine Learning | ||
| type: bug | ||
| issues: | ||
| - 134651 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -125,11 +125,12 @@ public void start() { | |
| running.set(false); | ||
| } | ||
| } | ||
|
|
||
| notifyQueueRunnables(); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| } finally { | ||
| // If we're throwing an exception, shutdown() may not have been called, so call it here | ||
| shutdown(); | ||
| notifyQueueRunnables(); | ||
| Runnable onComplete = onCompletion.get(); | ||
| if (onComplete != null) { | ||
| onComplete.run(); | ||
|
|
@@ -155,20 +156,22 @@ public synchronized void notifyQueueRunnables() { | |
| format("[%s] notifying [%d] queued requests that have not been processed before shutdown", processName, queue.size()) | ||
| ); | ||
|
|
||
| List<Runnable> notExecuted = new ArrayList<>(); | ||
| List<T> notExecuted = new ArrayList<>(); | ||
| queue.drainTo(notExecuted); | ||
|
|
||
| String msg = "unable to process as " + processName + " worker service has shutdown"; | ||
| Exception ex = error.get(); | ||
| for (Runnable runnable : notExecuted) { | ||
| if (runnable instanceof AbstractRunnable ar) { | ||
| if (ex != null) { | ||
| ar.onFailure(ex); | ||
| } else { | ||
| ar.onRejection(new EsRejectedExecutionException(msg, true)); | ||
| } | ||
| } | ||
| for (T runnable : notExecuted) { | ||
| notifyIfAbstractRunnable(runnable, error.get(), "unable to process as " + processName + " worker service has shutdown"); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| protected static void notifyAbstractRunnable(Exception ex, String msg, AbstractRunnable ar) { | ||
| if (ex != null) { | ||
| ar.onFailure(ex); | ||
| } else { | ||
| ar.onRejection(new EsRejectedExecutionException(msg, true)); | ||
| } | ||
| } | ||
|
|
||
| protected abstract void notifyIfAbstractRunnable(T runnable, Exception ex, String msg); | ||
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option is make
OrderedRunnableextendAbstractRunnableand implement the required methods as calling therunnableparameterThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This solution adds more code than the original one, but I think it might be better, since it simplifies the logic for determining what we need to notify.