diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index ef65d6d22caa..0f2839e7fba6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1744,7 +1744,7 @@ public void runInternal() checkIfStreamInactiveAndTurnSupervisorIdle(); // If supervisor is already stopping, don't contend for stateChangeLock since the block can be skipped - if (stateManager.getSupervisorState().getBasicState().equals(SupervisorStateManager.BasicState.STOPPING)) { + if (isStopping()) { logDebugReport(); return; } @@ -1752,7 +1752,7 @@ public void runInternal() synchronized (stateChangeLock) { // if supervisor is not suspended, ensure required tasks are running // if suspended, ensure tasks have been requested to gracefully stop - if (stateManager.getSupervisorState().getBasicState().equals(SupervisorStateManager.BasicState.STOPPING)) { + if (isStopping()) { // if we're already terminating, don't do anything here, the terminate already handles shutdown log.debug("Supervisor[%s] for datasource[%s] is already stopping.", supervisorId, dataSource); } else if (stateManager.isIdle()) { @@ -2024,7 +2024,16 @@ private void killTask(final String id, String reasonFormat, Object... args) { Optional taskQueue = taskMaster.getTaskQueue(); if (taskQueue.isPresent()) { - taskQueue.get().shutdown(id, reasonFormat, args); + if (isStopping()) { + log.debug( + "Not shutting down task[%s] because the supervisor[%s] has been stopped. Reason was[%s]", + id, + supervisorId, + StringUtils.format(reasonFormat, args) + ); + } else { + taskQueue.get().shutdown(id, reasonFormat, args); + } } else { log.error("Failed to get task queue because I'm not the leader!"); } @@ -2034,7 +2043,16 @@ private void killTaskWithSuccess(final String id, String reasonFormat, Object... { Optional taskQueue = taskMaster.getTaskQueue(); if (taskQueue.isPresent()) { - taskQueue.get().shutdownWithSuccess(id, reasonFormat, args); + if (isStopping()) { + log.debug( + "Not shutting down task[%s] because the supervisor[%s] has been stopped. Reason was[%s]", + id, + supervisorId, + StringUtils.format(reasonFormat, args) + ); + } else { + taskQueue.get().shutdownWithSuccess(id, reasonFormat, args); + } } else { log.error("Failed to get task queue because I'm not the leader!"); } @@ -4562,6 +4580,14 @@ private boolean checkOffsetAvailability( } } + /** + * Whether this supervisor is in a {@link SupervisorStateManager.BasicState#STOPPING} state. + */ + private boolean isStopping() + { + return stateManager.getSupervisorState().getBasicState().equals(SupervisorStateManager.BasicState.STOPPING); + } + /** * Call {@link FutureUtils#coalesce} on the provided list, and wait for the result. */ diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java index f30517d1b225..f64f390fc566 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java @@ -127,7 +127,9 @@ public SupervisorStateManager(SupervisorStateManagerConfig supervisorStateManage */ public synchronized void maybeSetState(State proposedState) { - if (BasicState.STOPPING.equals(this.supervisorState)) { + if (BasicState.STOPPING.equals(this.supervisorState) || BasicState.STOPPING.equals(proposedState)) { + // STOPPING takes precedence over all other states + supervisorState = BasicState.STOPPING; return; }