Skip to content

Commit 69a3f84

Browse files
authored
[ML] Wait for the worker service to shutdown before closing task processor (#117920) (#118165) (#118171)
1 parent 8f4f54e commit 69a3f84

File tree

2 files changed

+18
-21
lines changed

2 files changed

+18
-21
lines changed

docs/changelog/117920.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 117920
2+
summary: Wait for the worker service to shutdown before closing task processor
3+
area: Machine Learning
4+
type: bug
5+
issues:
6+
- 117563

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/deployment/DeploymentManager.java

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -631,12 +631,15 @@ synchronized void forcefullyStopProcess() {
631631
logger.debug(() -> format("[%s] Forcefully stopping process", task.getDeploymentId()));
632632
prepareInternalStateForShutdown();
633633

634-
if (priorityProcessWorker.isShutdown()) {
635-
// most likely there was a crash or exception that caused the
636-
// thread to stop. Notify any waiting requests in the work queue
637-
handleAlreadyShuttingDownWorker();
638-
} else {
639-
priorityProcessWorker.shutdown();
634+
priorityProcessWorker.shutdownNow();
635+
try {
636+
// wait for any currently executing work to finish
637+
if (priorityProcessWorker.awaitTermination(10L, TimeUnit.SECONDS)) {
638+
priorityProcessWorker.notifyQueueRunnables();
639+
}
640+
} catch (InterruptedException e) {
641+
Thread.currentThread().interrupt();
642+
logger.info(Strings.format("[%s] Interrupted waiting for process worker after shutdownNow", PROCESS_NAME));
640643
}
641644

642645
killProcessIfPresent();
@@ -649,12 +652,6 @@ private void prepareInternalStateForShutdown() {
649652
stateStreamer.cancel();
650653
}
651654

652-
private void handleAlreadyShuttingDownWorker() {
653-
logger.debug(() -> format("[%s] Process worker was already marked for shutdown", task.getDeploymentId()));
654-
655-
priorityProcessWorker.notifyQueueRunnables();
656-
}
657-
658655
private void killProcessIfPresent() {
659656
try {
660657
if (process.get() == null) {
@@ -675,15 +672,7 @@ private void closeNlpTaskProcessor() {
675672
private synchronized void stopProcessAfterCompletingPendingWork() {
676673
logger.debug(() -> format("[%s] Stopping process after completing its pending work", task.getDeploymentId()));
677674
prepareInternalStateForShutdown();
678-
679-
if (priorityProcessWorker.isShutdown()) {
680-
// most likely there was a crash or exception that caused the
681-
// thread to stop. Notify any waiting requests in the work queue
682-
handleAlreadyShuttingDownWorker();
683-
} else {
684-
signalAndWaitForWorkerTermination();
685-
}
686-
675+
signalAndWaitForWorkerTermination();
687676
stopProcessGracefully();
688677
closeNlpTaskProcessor();
689678
}
@@ -707,6 +696,8 @@ private void awaitTerminationAfterCompletingWork() throws TimeoutException {
707696
throw new TimeoutException(
708697
Strings.format("Timed out waiting for process worker to complete for process %s", PROCESS_NAME)
709698
);
699+
} else {
700+
priorityProcessWorker.notifyQueueRunnables();
710701
}
711702
} catch (InterruptedException e) {
712703
Thread.currentThread().interrupt();

0 commit comments

Comments
 (0)