Skip to content

Commit 620a01b

Browse files
authored
KAFKA-19661 [4/N]: Prefer range-style assignment (#20486)
This is actually fixing a difference between the old and the new assignor. Given the assignment ordering, the legacy assignor has a preference for range-style assignments built in, that is, assigning C1: 0_0, 1_0 C2: 0_1, 1_1 instead of C1: 0_0, 0_1 C2: 1_0, 1_1 We add tests to both assignors to check for this behavior, and improve the new assingor by enforcing corresponding orderings. Reviewers: Bill Bejeck <[email protected]>
1 parent f6f5b4c commit 620a01b

File tree

4 files changed

+283
-14
lines changed

4 files changed

+283
-14
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.HashMap;
2626
import java.util.HashSet;
2727
import java.util.Iterator;
28+
import java.util.LinkedList;
2829
import java.util.Map;
2930
import java.util.Optional;
3031
import java.util.PriorityQueue;
@@ -53,24 +54,23 @@ public GroupAssignment assign(final GroupSpec groupSpec, final TopologyDescriber
5354
}
5455

5556
private GroupAssignment doAssign(final GroupSpec groupSpec, final TopologyDescriber topologyDescriber) {
56-
//active
57-
final Set<TaskId> activeTasks = taskIds(topologyDescriber, true);
57+
final LinkedList<TaskId> activeTasks = taskIds(topologyDescriber, true);
5858
assignActive(activeTasks);
5959

6060
//standby
6161
final int numStandbyReplicas =
6262
groupSpec.assignmentConfigs().isEmpty() ? 0
6363
: Integer.parseInt(groupSpec.assignmentConfigs().get("num.standby.replicas"));
6464
if (numStandbyReplicas > 0) {
65-
final Set<TaskId> statefulTasks = taskIds(topologyDescriber, false);
65+
final LinkedList<TaskId> statefulTasks = taskIds(topologyDescriber, false);
6666
assignStandby(statefulTasks, numStandbyReplicas);
6767
}
6868

6969
return buildGroupAssignment(groupSpec.members().keySet());
7070
}
7171

72-
private Set<TaskId> taskIds(final TopologyDescriber topologyDescriber, final boolean isActive) {
73-
final Set<TaskId> ret = new HashSet<>();
72+
private LinkedList<TaskId> taskIds(final TopologyDescriber topologyDescriber, final boolean isActive) {
73+
final LinkedList<TaskId> ret = new LinkedList<>();
7474
for (final String subtopology : topologyDescriber.subtopologies()) {
7575
if (isActive || topologyDescriber.isStateful(subtopology)) {
7676
final int numberOfPartitions = topologyDescriber.maxNumInputPartitions(subtopology);
@@ -166,7 +166,10 @@ private Map<String, Set<Integer>> toCompactedTaskIds(final Set<TaskId> taskIds)
166166
return ret;
167167
}
168168

169-
private void assignActive(final Set<TaskId> activeTasks) {
169+
private void assignActive(final LinkedList<TaskId> activeTasks) {
170+
171+
// Assuming our current assignment pairs same partitions (range-based), we want to sort by partition first
172+
activeTasks.sort(Comparator.comparing(TaskId::partition).thenComparing(TaskId::subtopologyId));
170173

171174
// 1. re-assigning existing active tasks to clients that previously had the same active tasks
172175
for (final Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
@@ -193,6 +196,9 @@ private void assignActive(final Set<TaskId> activeTasks) {
193196
}
194197
}
195198

199+
// To achieve an initially range-based assignment, sort by subtopology
200+
activeTasks.sort(Comparator.comparing(TaskId::subtopologyId).thenComparing(TaskId::partition));
201+
196202
// 3. assign any remaining unassigned tasks
197203
final PriorityQueue<ProcessState> processByLoad = new PriorityQueue<>(Comparator.comparingDouble(ProcessState::load));
198204
processByLoad.addAll(localState.processIdToState.values());
@@ -296,9 +302,13 @@ private boolean hasUnfulfilledQuota(final Member member) {
296302
return localState.processIdToState.get(member.processId).memberToTaskCounts().get(member.memberId) < localState.tasksPerMember;
297303
}
298304

299-
private void assignStandby(final Set<TaskId> standbyTasks, final int numStandbyReplicas) {
305+
private void assignStandby(final LinkedList<TaskId> standbyTasks, int numStandbyReplicas) {
300306
final ArrayList<StandbyToAssign> toLeastLoaded = new ArrayList<>(standbyTasks.size() * numStandbyReplicas);
301-
for (final TaskId task : standbyTasks) {
307+
308+
// Assuming our current assignment is range-based, we want to sort by partition first.
309+
standbyTasks.sort(Comparator.comparing(TaskId::partition).thenComparing(TaskId::subtopologyId).reversed());
310+
311+
for (TaskId task : standbyTasks) {
302312
for (int i = 0; i < numStandbyReplicas; i++) {
303313

304314
// prev active task
@@ -329,6 +339,10 @@ private void assignStandby(final Set<TaskId> standbyTasks, final int numStandbyR
329339
}
330340
}
331341

342+
// To achieve a range-based assignment, sort by subtopology
343+
toLeastLoaded.sort(Comparator.<StandbyToAssign, String>comparing(x -> x.taskId.subtopologyId())
344+
.thenComparing(x -> x.taskId.partition()).reversed());
345+
332346
final PriorityQueue<ProcessState> processByLoad = new PriorityQueue<>(Comparator.comparingDouble(ProcessState::load));
333347
processByLoad.addAll(localState.processIdToState.values());
334348
for (final StandbyToAssign toAssign : toLeastLoaded) {

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1091,6 +1091,148 @@ public void shouldHandleEdgeCaseWithMoreStandbyReplicasThanAvailableClients() {
10911091
assertEquals(numTasks, allStandbyTasks.size());
10921092
}
10931093

1094+
@Test
1095+
public void shouldReassignTasksWhenNewNodeJoinsWithExistingActiveAndStandbyAssignments() {
1096+
// Initial setup: Node 1 has active tasks 0,1 and standby tasks 2,3
1097+
// Node 2 has active tasks 2,3 and standby tasks 0,1
1098+
final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1",
1099+
mkMap(mkEntry("test-subtopology", Sets.newSet(0, 1))),
1100+
mkMap(mkEntry("test-subtopology", Sets.newSet(2, 3))));
1101+
1102+
final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2",
1103+
mkMap(mkEntry("test-subtopology", Sets.newSet(2, 3))),
1104+
mkMap(mkEntry("test-subtopology", Sets.newSet(0, 1))));
1105+
1106+
// Node 3 joins as new client
1107+
final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3");
1108+
1109+
final Map<String, AssignmentMemberSpec> members = mkMap(
1110+
mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3));
1111+
1112+
final GroupAssignment result = assignor.assign(
1113+
new GroupSpecImpl(members, mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, "1"))),
1114+
new TopologyDescriberImpl(4, true, List.of("test-subtopology"))
1115+
);
1116+
1117+
// Verify all active tasks are assigned
1118+
final Set<Integer> allAssignedActiveTasks = new HashSet<>();
1119+
allAssignedActiveTasks.addAll(getAllActiveTaskIds(result, "member1"));
1120+
allAssignedActiveTasks.addAll(getAllActiveTaskIds(result, "member2"));
1121+
allAssignedActiveTasks.addAll(getAllActiveTaskIds(result, "member3"));
1122+
assertEquals(Sets.newSet(0, 1, 2, 3), allAssignedActiveTasks);
1123+
1124+
// Verify all standby tasks are assigned
1125+
final Set<Integer> allAssignedStandbyTasks = new HashSet<>();
1126+
allAssignedStandbyTasks.addAll(getAllStandbyTaskIds(result, "member1"));
1127+
allAssignedStandbyTasks.addAll(getAllStandbyTaskIds(result, "member2"));
1128+
allAssignedStandbyTasks.addAll(getAllStandbyTaskIds(result, "member3"));
1129+
assertEquals(Sets.newSet(0, 1, 2, 3), allAssignedStandbyTasks);
1130+
1131+
// Verify each member has 1-2 active tasks and at most 3 tasks total
1132+
assertTrue(getAllActiveTaskIds(result, "member1").size() >= 1 && getAllActiveTaskIds(result, "member1").size() <= 2);
1133+
assertTrue(getAllActiveTaskIds(result, "member1").size() + getAllStandbyTaskIds(result, "member1").size() <= 3);
1134+
1135+
assertTrue(getAllActiveTaskIds(result, "member2").size() >= 1 && getAllActiveTaskIds(result, "member2").size() <= 2);
1136+
assertTrue(getAllActiveTaskIds(result, "member2").size() + getAllStandbyTaskIds(result, "member2").size() <= 3);
1137+
1138+
assertTrue(getAllActiveTaskIds(result, "member3").size() >= 1 && getAllActiveTaskIds(result, "member3").size() <= 2);
1139+
assertTrue(getAllActiveTaskIds(result, "member3").size() + getAllStandbyTaskIds(result, "member3").size() <= 3);
1140+
}
1141+
1142+
@Test
1143+
public void shouldRangeAssignTasksWhenScalingUp() {
1144+
// Two clients, the second one is new
1145+
final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1",
1146+
Map.of("test-subtopology1", Set.of(0, 1), "test-subtopology2", Set.of(0, 1)),
1147+
Map.of());
1148+
final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2");
1149+
final Map<String, AssignmentMemberSpec> members = mkMap(
1150+
mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2));
1151+
1152+
// Two subtopologies with 2 tasks each (4 tasks total) with standby replicas enabled
1153+
final GroupAssignment result = assignor.assign(
1154+
new GroupSpecImpl(members, mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(1)))),
1155+
new TopologyDescriberImpl(2, true, Arrays.asList("test-subtopology1", "test-subtopology2"))
1156+
);
1157+
1158+
// Each client should get one task from each subtopology
1159+
final MemberAssignment testMember1 = result.members().get("member1");
1160+
assertNotNull(testMember1);
1161+
assertEquals(1, testMember1.activeTasks().get("test-subtopology1").size());
1162+
assertEquals(1, testMember1.activeTasks().get("test-subtopology2").size());
1163+
1164+
final MemberAssignment testMember2 = result.members().get("member2");
1165+
assertNotNull(testMember2);
1166+
assertEquals(1, testMember2.activeTasks().get("test-subtopology1").size());
1167+
assertEquals(1, testMember2.activeTasks().get("test-subtopology2").size());
1168+
1169+
// Verify all tasks are assigned exactly once
1170+
final Set<Integer> allSubtopology1Tasks = new HashSet<>();
1171+
allSubtopology1Tasks.addAll(testMember1.activeTasks().get("test-subtopology1"));
1172+
allSubtopology1Tasks.addAll(testMember2.activeTasks().get("test-subtopology1"));
1173+
assertEquals(Sets.newSet(0, 1), allSubtopology1Tasks);
1174+
1175+
final Set<Integer> allSubtopology2Tasks = new HashSet<>();
1176+
allSubtopology2Tasks.addAll(testMember1.activeTasks().get("test-subtopology2"));
1177+
allSubtopology2Tasks.addAll(testMember2.activeTasks().get("test-subtopology2"));
1178+
assertEquals(Sets.newSet(0, 1), allSubtopology2Tasks);
1179+
1180+
// Each client should get one task from each subtopology
1181+
assertNotNull(testMember1);
1182+
assertEquals(1, testMember1.standbyTasks().get("test-subtopology1").size());
1183+
assertEquals(1, testMember1.standbyTasks().get("test-subtopology2").size());
1184+
1185+
assertNotNull(testMember2);
1186+
assertEquals(1, testMember2.standbyTasks().get("test-subtopology1").size());
1187+
assertEquals(1, testMember2.standbyTasks().get("test-subtopology2").size());
1188+
}
1189+
1190+
@Test
1191+
public void shouldRangeAssignTasksWhenStartingEmpty() {
1192+
// Two clients starting empty (no previous tasks)
1193+
final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1");
1194+
final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2");
1195+
final Map<String, AssignmentMemberSpec> members = mkMap(
1196+
mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2));
1197+
1198+
// Two subtopologies with 2 tasks each (4 tasks total) with standby replicas enabled
1199+
final GroupAssignment result = assignor.assign(
1200+
new GroupSpecImpl(members, mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(1)))),
1201+
new TopologyDescriberImpl(2, true, Arrays.asList("test-subtopology1", "test-subtopology2"))
1202+
);
1203+
1204+
// Each client should get one task from each subtopology
1205+
final MemberAssignment testMember1 = result.members().get("member1");
1206+
assertNotNull(testMember1);
1207+
assertEquals(1, testMember1.activeTasks().get("test-subtopology1").size());
1208+
assertEquals(1, testMember1.activeTasks().get("test-subtopology2").size());
1209+
1210+
final MemberAssignment testMember2 = result.members().get("member2");
1211+
assertNotNull(testMember2);
1212+
assertEquals(1, testMember2.activeTasks().get("test-subtopology1").size());
1213+
assertEquals(1, testMember2.activeTasks().get("test-subtopology2").size());
1214+
1215+
// Verify all tasks are assigned exactly once
1216+
final Set<Integer> allSubtopology1Tasks = new HashSet<>();
1217+
allSubtopology1Tasks.addAll(testMember1.activeTasks().get("test-subtopology1"));
1218+
allSubtopology1Tasks.addAll(testMember2.activeTasks().get("test-subtopology1"));
1219+
assertEquals(Sets.newSet(0, 1), allSubtopology1Tasks);
1220+
1221+
final Set<Integer> allSubtopology2Tasks = new HashSet<>();
1222+
allSubtopology2Tasks.addAll(testMember1.activeTasks().get("test-subtopology2"));
1223+
allSubtopology2Tasks.addAll(testMember2.activeTasks().get("test-subtopology2"));
1224+
assertEquals(Sets.newSet(0, 1), allSubtopology2Tasks);
1225+
1226+
// Each client should get one task from each subtopology
1227+
assertNotNull(testMember1);
1228+
assertEquals(1, testMember1.standbyTasks().get("test-subtopology1").size());
1229+
assertEquals(1, testMember1.standbyTasks().get("test-subtopology2").size());
1230+
1231+
assertNotNull(testMember2);
1232+
assertEquals(1, testMember2.standbyTasks().get("test-subtopology1").size());
1233+
assertEquals(1, testMember2.standbyTasks().get("test-subtopology2").size());
1234+
}
1235+
10941236

10951237
private int getAllActiveTaskCount(GroupAssignment result, String... memberIds) {
10961238
int size = 0;

streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacyStickyTaskAssignorTest.java

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1263,6 +1263,119 @@ public void shouldRemainOriginalAssignmentWithoutTrafficCostForMinCostStrategy(f
12631263
}
12641264
}
12651265

1266+
@ParameterizedTest
1267+
@ValueSource(strings = {
1268+
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
1269+
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC,
1270+
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY
1271+
})
1272+
public void shouldReassignTasksWhenNewNodeJoinsWithExistingActiveAndStandbyAssignments(final String rackAwareStrategy) {
1273+
setUp(rackAwareStrategy);
1274+
1275+
// Initial setup: Node 1 has active tasks 0,1 and standby tasks 2,3
1276+
// Node 2 has active tasks 2,3 and standby tasks 0,1
1277+
final ClientState node1 = createClientWithPreviousActiveTasks(PID_1, 1, TASK_0_0, TASK_0_1);
1278+
node1.addPreviousStandbyTasks(Set.of(TASK_0_2, TASK_0_3));
1279+
1280+
final ClientState node2 = createClientWithPreviousActiveTasks(PID_2, 1, TASK_0_2, TASK_0_3);
1281+
node2.addPreviousStandbyTasks(Set.of(TASK_0_0, TASK_0_1));
1282+
1283+
// Node 3 joins as new client
1284+
final ClientState node3 = createClient(PID_3, 1);
1285+
1286+
final boolean probingRebalanceNeeded = assign(1, rackAwareStrategy, TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3);
1287+
assertThat(probingRebalanceNeeded, is(false));
1288+
1289+
// Verify all active tasks are assigned
1290+
final Set<TaskId> allAssignedActiveTasks = new HashSet<>();
1291+
allAssignedActiveTasks.addAll(node1.activeTasks());
1292+
allAssignedActiveTasks.addAll(node2.activeTasks());
1293+
allAssignedActiveTasks.addAll(node3.activeTasks());
1294+
assertThat(allAssignedActiveTasks, equalTo(Set.of(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3)));
1295+
1296+
// Verify all standby tasks are assigned
1297+
final Set<TaskId> allAssignedStandbyTasks = new HashSet<>();
1298+
allAssignedStandbyTasks.addAll(node1.standbyTasks());
1299+
allAssignedStandbyTasks.addAll(node2.standbyTasks());
1300+
allAssignedStandbyTasks.addAll(node3.standbyTasks());
1301+
assertThat(allAssignedStandbyTasks, equalTo(Set.of(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3)));
1302+
1303+
// Verify each client has 1-2 active tasks and at most 3 tasks total
1304+
assertThat(node1.activeTasks().size(), greaterThanOrEqualTo(1));
1305+
assertThat(node1.activeTasks().size(), lessThanOrEqualTo(2));
1306+
assertThat(node1.activeTasks().size() + node1.standbyTasks().size(), lessThanOrEqualTo(3));
1307+
1308+
assertThat(node2.activeTasks().size(), greaterThanOrEqualTo(1));
1309+
assertThat(node2.activeTasks().size(), lessThanOrEqualTo(2));
1310+
assertThat(node2.activeTasks().size() + node2.standbyTasks().size(), lessThanOrEqualTo(3));
1311+
1312+
assertThat(node3.activeTasks().size(), greaterThanOrEqualTo(1));
1313+
assertThat(node3.activeTasks().size(), lessThanOrEqualTo(2));
1314+
assertThat(node3.activeTasks().size() + node3.standbyTasks().size(), lessThanOrEqualTo(3));
1315+
}
1316+
1317+
@ParameterizedTest
1318+
@ValueSource(strings = {
1319+
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
1320+
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC,
1321+
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY
1322+
})
1323+
public void shouldRangeAssignTasksWhenStartingEmpty(final String rackAwareStrategy) {
1324+
setUp(rackAwareStrategy);
1325+
1326+
// Two clients with capacity 1 each, starting empty (no previous tasks)
1327+
createClient(PID_1, 1);
1328+
createClient(PID_2, 1);
1329+
1330+
// Two subtopologies with 2 tasks each (4 tasks total)
1331+
final boolean probingRebalanceNeeded = assign(1, rackAwareStrategy, TASK_0_0, TASK_0_1, TASK_1_0, TASK_1_1);
1332+
assertThat(probingRebalanceNeeded, is(false));
1333+
1334+
// Each client should get one active task from each subtopology
1335+
final ClientState client1 = clients.get(PID_1);
1336+
final ClientState client2 = clients.get(PID_2);
1337+
1338+
// Check that each client has one active task from subtopology 0
1339+
final long client1Subtopology0ActiveCount = client1.activeTasks().stream()
1340+
.filter(task -> task.subtopology() == 0)
1341+
.count();
1342+
final long client2Subtopology0ActiveCount = client2.activeTasks().stream()
1343+
.filter(task -> task.subtopology() == 0)
1344+
.count();
1345+
assertThat(client1Subtopology0ActiveCount, equalTo(1L));
1346+
assertThat(client2Subtopology0ActiveCount, equalTo(1L));
1347+
1348+
// Check that each client has one active task from subtopology 1
1349+
final long client1Subtopology1ActiveCount = client1.activeTasks().stream()
1350+
.filter(task -> task.subtopology() == 1)
1351+
.count();
1352+
final long client2Subtopology1ActiveCount = client2.activeTasks().stream()
1353+
.filter(task -> task.subtopology() == 1)
1354+
.count();
1355+
assertThat(client1Subtopology1ActiveCount, equalTo(1L));
1356+
assertThat(client2Subtopology1ActiveCount, equalTo(1L));
1357+
1358+
// Check that each client has one standby task from subtopology 0
1359+
final long client1Subtopology0StandbyCount = client1.standbyTasks().stream()
1360+
.filter(task -> task.subtopology() == 0)
1361+
.count();
1362+
final long client2Subtopology0StandbyCount = client2.standbyTasks().stream()
1363+
.filter(task -> task.subtopology() == 0)
1364+
.count();
1365+
assertThat(client1Subtopology0StandbyCount, equalTo(1L));
1366+
assertThat(client2Subtopology0StandbyCount, equalTo(1L));
1367+
1368+
// Check that each client has one standby task from subtopology 1
1369+
final long client1Subtopology1StandbyCount = client1.standbyTasks().stream()
1370+
.filter(task -> task.subtopology() == 1)
1371+
.count();
1372+
final long client2Subtopology1StandbyCount = client2.standbyTasks().stream()
1373+
.filter(task -> task.subtopology() == 1)
1374+
.count();
1375+
assertThat(client1Subtopology1StandbyCount, equalTo(1L));
1376+
assertThat(client2Subtopology1StandbyCount, equalTo(1L));
1377+
}
1378+
12661379
private boolean assign(final String rackAwareStrategy, final TaskId... tasks) {
12671380
return assign(0, rackAwareStrategy, tasks);
12681381
}

0 commit comments

Comments
 (0)