diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index 8ea2d3ae65a51..5bade0eb64b0e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -556,10 +556,41 @@ public synchronized void clean() { try { if (hasPersistentStores && stateDir.exists() && !stateDir.delete()) { - log.warn( - String.format("%s Failed to delete state store directory of %s for it is not empty", - logPrefix(), stateDir.getAbsolutePath()) - ); + final File[] remainingFiles = stateDir.listFiles(); + if (remainingFiles == null) { + log.warn("{} Failed to delete state store directory of {} for it is not empty", + logPrefix(), stateDir.getAbsolutePath()); + return; + } + + boolean hasProcessFiles = false; + boolean hasNonProcessFiles = false; + + for (final File file : remainingFiles) { + final String name = file.getName(); + if (PROCESS_FILE_NAME.equals(name)) { + hasProcessFiles = true; + } else { + hasNonProcessFiles = true; + break; + } + } + + if (hasProcessFiles && !hasNonProcessFiles) { + // KAFKA-10716: The processId file is persisted in the state directory to keep the + // processId stable across restarts. Removing it would cause a new processId to be + // generated and may lead to unnecessary task movements during rebalances. + log.debug( + "{} State store directory {} was not deleted because it still contains the " + + "process metadata file {} that is required for stable task assignment across restarts.", + logPrefix(), stateDir.getAbsolutePath(), PROCESS_FILE_NAME + ); + } else { + log.warn( + String.format("%s Failed to delete state store directory of %s for it is not empty", + logPrefix(), stateDir.getAbsolutePath()) + ); + } } } catch (final SecurityException exception) { log.error(