@@ -631,12 +631,15 @@ synchronized void forcefullyStopProcess() {
631631 logger .debug (() -> format ("[%s] Forcefully stopping process" , task .getDeploymentId ()));
632632 prepareInternalStateForShutdown ();
633633
634- if (priorityProcessWorker .isShutdown ()) {
635- // most likely there was a crash or exception that caused the
636- // thread to stop. Notify any waiting requests in the work queue
637- handleAlreadyShuttingDownWorker ();
638- } else {
639- priorityProcessWorker .shutdown ();
634+ priorityProcessWorker .shutdownNow ();
635+ try {
636+ // wait for any currently executing work to finish
637+ if (priorityProcessWorker .awaitTermination (10L , TimeUnit .SECONDS )) {
638+ priorityProcessWorker .notifyQueueRunnables ();
639+ }
640+ } catch (InterruptedException e ) {
641+ Thread .currentThread ().interrupt ();
642+ logger .info (Strings .format ("[%s] Interrupted waiting for process worker after shutdownNow" , PROCESS_NAME ));
640643 }
641644
642645 killProcessIfPresent ();
@@ -649,12 +652,6 @@ private void prepareInternalStateForShutdown() {
649652 stateStreamer .cancel ();
650653 }
651654
652- private void handleAlreadyShuttingDownWorker () {
653- logger .debug (() -> format ("[%s] Process worker was already marked for shutdown" , task .getDeploymentId ()));
654-
655- priorityProcessWorker .notifyQueueRunnables ();
656- }
657-
658655 private void killProcessIfPresent () {
659656 try {
660657 if (process .get () == null ) {
@@ -675,15 +672,7 @@ private void closeNlpTaskProcessor() {
675672 private synchronized void stopProcessAfterCompletingPendingWork () {
676673 logger .debug (() -> format ("[%s] Stopping process after completing its pending work" , task .getDeploymentId ()));
677674 prepareInternalStateForShutdown ();
678-
679- if (priorityProcessWorker .isShutdown ()) {
680- // most likely there was a crash or exception that caused the
681- // thread to stop. Notify any waiting requests in the work queue
682- handleAlreadyShuttingDownWorker ();
683- } else {
684- signalAndWaitForWorkerTermination ();
685- }
686-
675+ signalAndWaitForWorkerTermination ();
687676 stopProcessGracefully ();
688677 closeNlpTaskProcessor ();
689678 }
@@ -707,6 +696,8 @@ private void awaitTerminationAfterCompletingWork() throws TimeoutException {
707696 throw new TimeoutException (
708697 Strings .format ("Timed out waiting for process worker to complete for process %s" , PROCESS_NAME )
709698 );
699+ } else {
700+ priorityProcessWorker .notifyQueueRunnables ();
710701 }
711702 } catch (InterruptedException e ) {
712703 Thread .currentThread ().interrupt ();
0 commit comments