Skip to content

Commit d1b160b

Browse files
KAFKA-19510: clear pendingTasksToInit on tasks clear. (#20646)
Clear pendingTasksToInit on tasks clear. It matters in situations when we shutting down a thread in PARTITIONS_ASSIGNED state. In this case we may have locked some unassigned task directories (see TaskManager#tryToLockAllNonEmptyTaskDirectories). Then we may have gotten assigned to one or multiple of those tasks. In this scenario, we will not release the locks for the unassigned task directories (see TaskManager#releaseLockedUnassignedTaskDirectories), because TaskManager#allTasks includes pendingTasksToInit, but it hasn't been cleared. Reviewers: Matthias J. Sax <[email protected]>, Lucas Brutschy <[email protected]>
1 parent 25de320 commit d1b160b

File tree

2 files changed

+34
-0
lines changed
  • streams/src
    • main/java/org/apache/kafka/streams/processor/internals
    • test/java/org/apache/kafka/streams/processor/internals

2 files changed

+34
-0
lines changed

streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,9 @@ private void removePartitionsForActiveTask(final TaskId taskId) {
258258

259259
@Override
260260
public synchronized void clear() {
261+
pendingTasksToInit.clear();
262+
pendingActiveTasksToCreate.clear();
263+
pendingStandbyTasksToCreate.clear();
261264
activeTasksPerId.clear();
262265
standbyTasksPerId.clear();
263266
activeTasksPerPartition.clear();
@@ -346,4 +349,12 @@ public synchronized Map<TaskId, Task> allTasksPerId() {
346349
public boolean contains(final TaskId taskId) {
347350
return getTask(taskId) != null;
348351
}
352+
353+
Map<TaskId, Set<TopicPartition>> pendingActiveTasksToCreate() {
354+
return pendingActiveTasksToCreate;
355+
}
356+
357+
Map<TaskId, Set<TopicPartition>> pendingStandbyTasksToCreate() {
358+
return pendingStandbyTasksToCreate;
359+
}
349360
}

streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,4 +207,27 @@ public void shouldClearFailedTask() {
207207
tasks.addTask(activeTask1);
208208
assertTrue(tasks.allNonFailedTasks().contains(activeTask1));
209209
}
210+
211+
@Test
212+
public void shouldClearAllPendingTasks() {
213+
final StandbyTask task = standbyTask(TASK_0_0, Set.of(TOPIC_PARTITION_B_0))
214+
.inState(State.CREATED).build();
215+
tasks.addPendingTasksToInit(Collections.singleton(task));
216+
final TaskId taskId1 = new TaskId(0, 0, "A");
217+
tasks.addPendingActiveTasksToCreate(mkMap(
218+
mkEntry(taskId1, Set.of(TOPIC_PARTITION_A_0))
219+
));
220+
final TaskId taskId2 = new TaskId(0, 1, "A");
221+
tasks.addPendingStandbyTasksToCreate(mkMap(
222+
mkEntry(taskId2, Set.of(TOPIC_PARTITION_A_0))
223+
));
224+
225+
assertTrue(tasks.pendingTasksToInit().contains(task));
226+
assertTrue(tasks.pendingActiveTasksToCreate().containsKey(taskId1));
227+
assertTrue(tasks.pendingStandbyTasksToCreate().containsKey(taskId2));
228+
tasks.clear();
229+
assertTrue(tasks.pendingTasksToInit().isEmpty());
230+
assertTrue(tasks.pendingActiveTasksToCreate().isEmpty());
231+
assertTrue(tasks.pendingStandbyTasksToCreate().isEmpty());
232+
}
210233
}

0 commit comments

Comments
 (0)