Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,14 @@ private static void assignActive(final ApplicationState applicationState,
final int totalCapacity = computeTotalProcessingThreads(clients);
final Set<TaskId> allTaskIds = applicationState.allTasks().keySet();
final int taskCount = allTaskIds.size();
final int activeTasksPerThread = taskCount / totalCapacity;
final Set<TaskId> unassigned = new HashSet<>(allTaskIds);

// first try and re-assign existing active tasks to clients that previously had
// the same active task
for (final TaskId taskId : assignmentState.previousActiveAssignment.keySet()) {
final ProcessId previousClientForTask = assignmentState.previousActiveAssignment.get(taskId);
if (allTaskIds.contains(taskId)) {
if (mustPreserveActiveTaskAssignment || assignmentState.hasRoomForActiveTask(previousClientForTask, activeTasksPerThread)) {
if (mustPreserveActiveTaskAssignment || assignmentState.hasRoomForActiveTask(previousClientForTask, taskCount, totalCapacity)) {
assignmentState.finalizeAssignment(taskId, previousClientForTask, AssignedTask.Type.ACTIVE);
unassigned.remove(taskId);
}
Expand All @@ -167,7 +166,7 @@ private static void assignActive(final ApplicationState applicationState,
final TaskId taskId = iterator.next();
final Set<ProcessId> previousClientsForStandbyTask = assignmentState.previousStandbyAssignment.getOrDefault(taskId, new HashSet<>());
for (final ProcessId client: previousClientsForStandbyTask) {
if (assignmentState.hasRoomForActiveTask(client, activeTasksPerThread)) {
if (assignmentState.hasRoomForActiveTask(client, taskCount, totalCapacity)) {
assignmentState.finalizeAssignment(taskId, client, AssignedTask.Type.ACTIVE);
iterator.remove();
break;
Expand Down Expand Up @@ -295,14 +294,15 @@ private void processOptimizedAssignments(final Map<ProcessId, KafkaStreamsAssign
this.newAssignments = optimizedAssignments;
}

private boolean hasRoomForActiveTask(final ProcessId processId, final int activeTasksPerThread) {
private boolean hasRoomForActiveTask(final ProcessId processId, final int taskCount, final int totalCapacity) {
final int capacity = clients.get(processId).numProcessingThreads();
final int newActiveTaskCount = newAssignments.computeIfAbsent(processId, k -> KafkaStreamsAssignment.of(processId, new HashSet<>()))
.tasks().values()
.stream().filter(assignedTask -> assignedTask.type() == AssignedTask.Type.ACTIVE)
.collect(Collectors.toSet())
.size();
return newActiveTaskCount < capacity * activeTasksPerThread;
final int instanceLimit = (taskCount * capacity + totalCapacity - 1) / totalCapacity;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs a regression test that fails with the old quota math and passes with the new proportional ceiling.

On top of that, can we add a behavior test that simulates repeated StickyTaskAssignor.assign(...) rounds, and then assert that an uneven-capacity case converges in 2 rounds? That would cover the actual failure mode described in the PR, not just the arithmetic change.

Copy link
Copy Markdown
Contributor Author

@ChoMinGi ChoMinGi Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@harimm
Thanks for the review! Added two tests

  • shouldRetainFairShareOfPreviousTasksWhenScalingUp — verifies that an instance keeps all 225 of its previous tasks (450 tasks / 2×10 threads). Fails with the old floor-based limit (220).
  • shouldConvergeWithinTwoRoundsWhenScalingUp — simulates repeated assign() rounds with the reporter's scenario and asserts convergence within 2 rounds. Fails with floor due to 10-task overflow bouncing.

Let me know if you'd like any adjustments.

return newActiveTaskCount < instanceLimit;
}

private ProcessId findBestClientForTask(final TaskId taskId, final Set<ProcessId> clientsWithin) {
Expand Down