Skip to content

Commit 3512038

Browse files
authored
KAFKA-19661 [5/N]: Use below-quota as a condition for standby task assignment (#20458)
In the original algorithm, standby tasks are assigned to a process that previously owned the task only if it is “load-balanced”, meaning the process has fewer tasks that members, or it is the least loaded process. This is strong requirement, and will cause standby tasks to often not get assigned to process that previously owned it. Furthermore, the condition “is the least loaded process” is hard to evaluate efficiently in this context. We propose to instead use the same “below-quota” condition as in active task assignment. We compute a quota for active and standby tasks, by definiing numOfTasks = numberOfActiveTasks+numberOfStandbyTasks and defining the quota as numOfTasks/numberOfMembers rounded up. Whenever a member becomes “full” (its assigned number of tasks is equal to numOfTasks) we deduct its tasks from numOfTasks and decrement numberOfMembers and recompute the quota, which means that the quota may be reduced by one during the assignment process, to avoid uneven assignments. A standby task can be assigned to a process that previously owned it, whenever the process has fewer than numOfMembersOfProcess*quota. This condition will, again, prioritize standby stickyness, and can be evaluated in constant time. In our worst-performing benchmark, this improves the runtime by 2.5x on top of the previous optimizations, but 5x on the more important incremental assignment case. Reviewers: Bill Bejeck <[email protected]>
1 parent 709c5fa commit 3512038

File tree

3 files changed

+138
-60
lines changed

3 files changed

+138
-60
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ public class ProcessState {
3232
private int capacity;
3333
private double load;
3434
private int taskCount;
35-
private int activeTaskCount;
3635
private final Map<String, Integer> memberToTaskCounts;
3736
private final Map<String, Set<TaskId>> assignedActiveTasks;
3837
private final Map<String, Set<TaskId>> assignedStandbyTasks;
@@ -65,10 +64,6 @@ public Map<String, Integer> memberToTaskCounts() {
6564
return memberToTaskCounts;
6665
}
6766

68-
public int activeTaskCount() {
69-
return activeTaskCount;
70-
}
71-
7267
public Set<TaskId> assignedActiveTasks() {
7368
return assignedActiveTasks.values().stream()
7469
.flatMap(Set::stream)
@@ -93,7 +88,6 @@ public void addTask(final String memberId, final TaskId taskId, final boolean is
9388
taskCount += 1;
9489
assignedTasks.add(taskId);
9590
if (isActive) {
96-
activeTaskCount += 1;
9791
assignedActiveTasks.putIfAbsent(memberId, new HashSet<>());
9892
assignedActiveTasks.get(memberId).add(taskId);
9993
} else {

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java

Lines changed: 76 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,9 @@ private GroupAssignment doAssign(final GroupSpec groupSpec, final TopologyDescri
5757
final LinkedList<TaskId> activeTasks = taskIds(topologyDescriber, true);
5858
assignActive(activeTasks);
5959

60-
//standby
61-
final int numStandbyReplicas =
62-
groupSpec.assignmentConfigs().isEmpty() ? 0
63-
: Integer.parseInt(groupSpec.assignmentConfigs().get("num.standby.replicas"));
64-
if (numStandbyReplicas > 0) {
60+
if (localState.numStandbyReplicas > 0) {
6561
final LinkedList<TaskId> statefulTasks = taskIds(topologyDescriber, false);
66-
assignStandby(statefulTasks, numStandbyReplicas);
62+
assignStandby(statefulTasks);
6763
}
6864

6965
return buildGroupAssignment(groupSpec.members().keySet());
@@ -84,13 +80,24 @@ private LinkedList<TaskId> taskIds(final TopologyDescriber topologyDescriber, fi
8480

8581
private void initialize(final GroupSpec groupSpec, final TopologyDescriber topologyDescriber) {
8682
localState = new LocalState();
87-
localState.allTasks = 0;
83+
localState.numStandbyReplicas =
84+
groupSpec.assignmentConfigs().isEmpty() ? 0
85+
: Integer.parseInt(groupSpec.assignmentConfigs().get("num.standby.replicas"));
86+
87+
// Helpers for computing active tasks per member, and tasks per member
88+
localState.totalActiveTasks = 0;
89+
localState.totalTasks = 0;
8890
for (final String subtopology : topologyDescriber.subtopologies()) {
8991
final int numberOfPartitions = topologyDescriber.maxNumInputPartitions(subtopology);
90-
localState.allTasks += numberOfPartitions;
92+
localState.totalTasks += numberOfPartitions;
93+
localState.totalActiveTasks += numberOfPartitions;
94+
if (topologyDescriber.isStateful(subtopology))
95+
localState.totalTasks += numberOfPartitions * localState.numStandbyReplicas;
9196
}
92-
localState.totalCapacity = groupSpec.members().size();
93-
localState.tasksPerMember = computeTasksPerMember(localState.allTasks, localState.totalCapacity);
97+
localState.totalMembersWithActiveTaskCapacity = groupSpec.members().size();
98+
localState.totalMembersWithTaskCapacity = groupSpec.members().size();
99+
localState.activeTasksPerMember = computeTasksPerMember(localState.totalActiveTasks, localState.totalMembersWithActiveTaskCapacity);
100+
localState.tasksPerMember = computeTasksPerMember(localState.totalTasks, localState.totalMembersWithTaskCapacity);
94101

95102
localState.processIdToState = new HashMap<>();
96103
localState.activeTaskToPrevMember = new HashMap<>();
@@ -175,11 +182,13 @@ private void assignActive(final LinkedList<TaskId> activeTasks) {
175182
for (final Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
176183
final TaskId task = it.next();
177184
final Member prevMember = localState.activeTaskToPrevMember.get(task);
178-
if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
185+
if (prevMember != null) {
179186
final ProcessState processState = localState.processIdToState.get(prevMember.processId);
180-
processState.addTask(prevMember.memberId, task, true);
181-
maybeUpdateTasksPerMember(processState.activeTaskCount());
182-
it.remove();
187+
if (hasUnfulfilledActiveTaskQuota(processState, prevMember)) {
188+
processState.addTask(prevMember.memberId, task, true);
189+
maybeUpdateActiveTasksPerMember(processState.memberToTaskCounts().get(prevMember.memberId));
190+
it.remove();
191+
}
183192
}
184193
}
185194

@@ -188,11 +197,13 @@ private void assignActive(final LinkedList<TaskId> activeTasks) {
188197
final TaskId task = it.next();
189198
final ArrayList<Member> prevMembers = localState.standbyTaskToPrevMember.get(task);
190199
final Member prevMember = findPrevMemberWithLeastLoad(prevMembers, null);
191-
if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
200+
if (prevMember != null) {
192201
final ProcessState processState = localState.processIdToState.get(prevMember.processId);
193-
processState.addTask(prevMember.memberId, task, true);
194-
maybeUpdateTasksPerMember(processState.activeTaskCount());
195-
it.remove();
202+
if (hasUnfulfilledActiveTaskQuota(processState, prevMember)) {
203+
processState.addTask(prevMember.memberId, task, true);
204+
maybeUpdateActiveTasksPerMember(processState.memberToTaskCounts().get(prevMember.memberId));
205+
it.remove();
206+
}
196207
}
197208
}
198209

@@ -206,24 +217,32 @@ private void assignActive(final LinkedList<TaskId> activeTasks) {
206217
final TaskId task = it.next();
207218
final ProcessState processWithLeastLoad = processByLoad.poll();
208219
if (processWithLeastLoad == null) {
209-
throw new TaskAssignorException("No process available to assign active task {}." + task);
220+
throw new TaskAssignorException(String.format("No process available to assign active task %s.", task));
210221
}
211222
final String member = memberWithLeastLoad(processWithLeastLoad);
212223
if (member == null) {
213-
throw new TaskAssignorException("No member available to assign active task {}." + task);
224+
throw new TaskAssignorException(String.format("No member available to assign active task %s.", task));
214225
}
215226
processWithLeastLoad.addTask(member, task, true);
216227
it.remove();
217-
maybeUpdateTasksPerMember(processWithLeastLoad.activeTaskCount());
228+
maybeUpdateActiveTasksPerMember(processWithLeastLoad.memberToTaskCounts().get(member));
218229
processByLoad.add(processWithLeastLoad); // Add it back to the queue after updating its state
219230
}
220231
}
221232

222-
private void maybeUpdateTasksPerMember(final int activeTasksNo) {
223-
if (activeTasksNo == localState.tasksPerMember) {
224-
localState.totalCapacity--;
225-
localState.allTasks -= activeTasksNo;
226-
localState.tasksPerMember = computeTasksPerMember(localState.allTasks, localState.totalCapacity);
233+
private void maybeUpdateActiveTasksPerMember(final int activeTasksNo) {
234+
if (activeTasksNo == localState.activeTasksPerMember) {
235+
localState.totalMembersWithActiveTaskCapacity--;
236+
localState.totalActiveTasks -= activeTasksNo;
237+
localState.activeTasksPerMember = computeTasksPerMember(localState.totalActiveTasks, localState.totalMembersWithActiveTaskCapacity);
238+
}
239+
}
240+
241+
private void maybeUpdateTasksPerMember(final int taskNo) {
242+
if (taskNo == localState.tasksPerMember) {
243+
localState.totalMembersWithTaskCapacity--;
244+
localState.totalTasks -= taskNo;
245+
localState.tasksPerMember = computeTasksPerMember(localState.totalTasks, localState.totalMembersWithTaskCapacity);
227246
}
228247
}
229248

@@ -298,43 +317,49 @@ private String memberWithLeastLoad(final ProcessState processWithLeastLoad) {
298317
return memberWithLeastLoad.orElse(null);
299318
}
300319

301-
private boolean hasUnfulfilledQuota(final Member member) {
302-
return localState.processIdToState.get(member.processId).memberToTaskCounts().get(member.memberId) < localState.tasksPerMember;
320+
private boolean hasUnfulfilledActiveTaskQuota(final ProcessState process, final Member member) {
321+
return process.memberToTaskCounts().get(member.memberId) < localState.activeTasksPerMember;
303322
}
304323

305-
private void assignStandby(final LinkedList<TaskId> standbyTasks, int numStandbyReplicas) {
306-
final ArrayList<StandbyToAssign> toLeastLoaded = new ArrayList<>(standbyTasks.size() * numStandbyReplicas);
324+
private boolean hasUnfulfilledTaskQuota(final ProcessState process, final Member member) {
325+
return process.memberToTaskCounts().get(member.memberId) < localState.tasksPerMember;
326+
}
307327

328+
private void assignStandby(final LinkedList<TaskId> standbyTasks) {
329+
final ArrayList<StandbyToAssign> toLeastLoaded = new ArrayList<>(standbyTasks.size() * localState.numStandbyReplicas);
330+
308331
// Assuming our current assignment is range-based, we want to sort by partition first.
309332
standbyTasks.sort(Comparator.comparing(TaskId::partition).thenComparing(TaskId::subtopologyId).reversed());
310333

311334
for (TaskId task : standbyTasks) {
312-
for (int i = 0; i < numStandbyReplicas; i++) {
335+
for (int i = 0; i < localState.numStandbyReplicas; i++) {
313336

314337
// prev active task
315-
final Member prevMember = localState.activeTaskToPrevMember.get(task);
316-
if (prevMember != null) {
317-
final ProcessState prevMemberProcessState = localState.processIdToState.get(prevMember.processId);
318-
if (!prevMemberProcessState.hasTask(task) && isLoadBalanced(prevMemberProcessState)) {
319-
prevMemberProcessState.addTask(prevMember.memberId, task, false);
338+
final Member prevActiveMember = localState.activeTaskToPrevMember.get(task);
339+
if (prevActiveMember != null) {
340+
final ProcessState prevActiveMemberProcessState = localState.processIdToState.get(prevActiveMember.processId);
341+
if (!prevActiveMemberProcessState.hasTask(task) && hasUnfulfilledTaskQuota(prevActiveMemberProcessState, prevActiveMember)) {
342+
prevActiveMemberProcessState.addTask(prevActiveMember.memberId, task, false);
343+
maybeUpdateTasksPerMember(prevActiveMemberProcessState.memberToTaskCounts().get(prevActiveMember.memberId));
320344
continue;
321345
}
322346
}
323347

324348
// prev standby tasks
325-
final ArrayList<Member> prevMembers = localState.standbyTaskToPrevMember.get(task);
326-
if (prevMembers != null && !prevMembers.isEmpty()) {
327-
final Member prevMember2 = findPrevMemberWithLeastLoad(prevMembers, task);
328-
if (prevMember2 != null) {
329-
final ProcessState prevMemberProcessState = localState.processIdToState.get(prevMember2.processId);
330-
if (isLoadBalanced(prevMemberProcessState)) {
331-
prevMemberProcessState.addTask(prevMember2.memberId, task, false);
349+
final ArrayList<Member> prevStandbyMembers = localState.standbyTaskToPrevMember.get(task);
350+
if (prevStandbyMembers != null && !prevStandbyMembers.isEmpty()) {
351+
final Member prevStandbyMember = findPrevMemberWithLeastLoad(prevStandbyMembers, task);
352+
if (prevStandbyMember != null) {
353+
final ProcessState prevStandbyMemberProcessState = localState.processIdToState.get(prevStandbyMember.processId);
354+
if (hasUnfulfilledTaskQuota(prevStandbyMemberProcessState, prevStandbyMember)) {
355+
prevStandbyMemberProcessState.addTask(prevStandbyMember.memberId, task, false);
356+
maybeUpdateTasksPerMember(prevStandbyMemberProcessState.memberToTaskCounts().get(prevStandbyMember.memberId));
332357
continue;
333358
}
334359
}
335360
}
336361

337-
toLeastLoaded.add(new StandbyToAssign(task, numStandbyReplicas - i));
362+
toLeastLoaded.add(new StandbyToAssign(task, localState.numStandbyReplicas - i));
338363
break;
339364
}
340365
}
@@ -350,7 +375,7 @@ private void assignStandby(final LinkedList<TaskId> standbyTasks, int numStandby
350375
if (!assignStandbyToMemberWithLeastLoad(processByLoad, toAssign.taskId)) {
351376
log.warn("{} There is not enough available capacity. " +
352377
"You should increase the number of threads and/or application instances to maintain the requested number of standby replicas.",
353-
errorMessage(numStandbyReplicas, i, toAssign.taskId));
378+
errorMessage(localState.numStandbyReplicas, i, toAssign.taskId));
354379
break;
355380
}
356381
}
@@ -362,13 +387,6 @@ private String errorMessage(final int numStandbyReplicas, final int i, final Tas
362387
" of " + numStandbyReplicas + " standby tasks for task [" + task + "].";
363388
}
364389

365-
private boolean isLoadBalanced(final ProcessState process) {
366-
final double load = process.load();
367-
final boolean isLeastLoadedProcess = localState.processIdToState.values().stream()
368-
.allMatch(p -> p.load() >= load);
369-
return process.hasCapacity() || isLeastLoadedProcess;
370-
}
371-
372390
private static int computeTasksPerMember(final int numberOfTasks, final int numberOfMembers) {
373391
if (numberOfMembers == 0) {
374392
return 0;
@@ -406,8 +424,12 @@ private static class LocalState {
406424
Map<TaskId, ArrayList<Member>> standbyTaskToPrevMember;
407425
Map<String, ProcessState> processIdToState;
408426

409-
int allTasks;
410-
int totalCapacity;
427+
int numStandbyReplicas;
428+
int totalActiveTasks;
429+
int totalTasks;
430+
int totalMembersWithActiveTaskCapacity;
431+
int totalMembersWithTaskCapacity;
432+
int activeTasksPerMember;
411433
int tasksPerMember;
412434
}
413435
}

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1233,6 +1233,68 @@ public void shouldRangeAssignTasksWhenStartingEmpty() {
12331233
assertEquals(1, testMember2.standbyTasks().get("test-subtopology2").size());
12341234
}
12351235

1236+
@Test
1237+
public void shouldAssignStandbyTaskToPreviousOwnerBasedOnBelowQuotaCondition() {
1238+
// This test demonstrates the change from "load-balanced" to "below-quota" condition for standby assignment.
1239+
// We create a scenario where:
1240+
// - Process1 previously owned standby task 1 and currently has 1 active task (task 0)
1241+
// - Process2 currently has 1 active task (task 1)
1242+
// - Process3 has no previous tasks (least loaded after assignment)
1243+
//
1244+
// Under the old "load-balanced" algorithm: Process1 might not get standby task 1 because
1245+
// Process3 could be considered least loaded.
1246+
//
1247+
// Under the new "below-quota" algorithm: Process1 gets standby task 1 because
1248+
// it previously owned it AND is below quota.
1249+
1250+
// Set up previous task assignments:
1251+
// Process1: active=[0], standby=[1] (previously had both active and standby tasks)
1252+
// Process2: active=[1] (had the active task that process1 had as standby)
1253+
// Process3: no previous tasks
1254+
final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1",
1255+
mkMap(mkEntry("test-subtopology", Sets.newSet(0))),
1256+
mkMap(mkEntry("test-subtopology", Sets.newSet(1))));
1257+
final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2",
1258+
mkMap(mkEntry("test-subtopology", Sets.newSet(1))),
1259+
Map.of());
1260+
final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3");
1261+
1262+
final Map<String, AssignmentMemberSpec> members = mkMap(
1263+
mkEntry("member1", memberSpec1),
1264+
mkEntry("member2", memberSpec2),
1265+
mkEntry("member3", memberSpec3));
1266+
1267+
// We have 2 active tasks + 1 standby replica = 4 total tasks
1268+
// Quota per process = 4 tasks / 3 processes = 1.33 -> 2 tasks per process
1269+
final GroupAssignment result = assignor.assign(
1270+
new GroupSpecImpl(members, mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, "1"))),
1271+
new TopologyDescriberImpl(2, true, List.of("test-subtopology"))
1272+
);
1273+
1274+
// Verify that process1 gets the standby task 1 that it previously owned
1275+
// This should work under the new "below-quota" algorithm since process1 has only 1 active task
1276+
// which is below the quota of 2 tasks per process
1277+
final MemberAssignment member1 = result.members().get("member1");
1278+
assertNotNull(member1);
1279+
1280+
// Member1 should retain its active task 0
1281+
assertTrue(member1.activeTasks().get("test-subtopology").contains(0));
1282+
1283+
// Member1 should get standby task 1 because it previously owned it and is below quota
1284+
assertNotNull(member1.standbyTasks().get("test-subtopology"), "Member1 should have standby tasks assigned");
1285+
assertTrue(member1.standbyTasks().get("test-subtopology").contains(1),
1286+
"Member1 should have standby task 1, but has: " + member1.standbyTasks().get("test-subtopology"));
1287+
1288+
// Verify that member1 doesn't have active task 1 (standby can't be same as active)
1289+
assertFalse(member1.activeTasks().get("test-subtopology").contains(1));
1290+
1291+
// Verify the process1's total task count is at or below quota
1292+
int member1ActiveCount = member1.activeTasks().get("test-subtopology").size();
1293+
int member1StandbyCount = member1.standbyTasks().get("test-subtopology").size();
1294+
int member1TotalTasks = member1ActiveCount + member1StandbyCount;
1295+
assertTrue(member1TotalTasks <= 2, "Member1 should have <= 2 total tasks (quota), but has " + member1TotalTasks);
1296+
}
1297+
12361298

12371299
private int getAllActiveTaskCount(GroupAssignment result, String... memberIds) {
12381300
int size = 0;

0 commit comments

Comments
 (0)