Skip to content

Commit c90e97a

Browse files
m1a2stchia7712
authored andcommitted
KAFKA-20023 Fix kafka-reassign-partitions.sh to handle dead brokers (#21222)
Return Node objects instead and use Node#isEmpty() to filter them out before querying. Test Result: Broker 4 is alive ``` ./bin/kafka-reassign-partitions.sh --bootstrap-server localhost:19092 --topics-to-move-json-file tmp.json --broker-list "2,3,4" --generate Current partition replica assignment {"version":1,"partitions":[{"topic":"test1","partition":0,"replicas":[3,4],"log_dirs":["/tmp/kraft-broker-logs-1","/tmp/kraft-broker-logs-2"]},{"topic":"test1","partition":1,"replicas":[4,2],"log_dirs":["/tmp/kraft-broker-logs-2","/tmp/kraft-broker-logs"]}]} ``` Broker 4 shutdown ``` /bin/kafka-reassign-partitions.sh --bootstrap-server localhost:19092 --topics-to-move-json-file tmp.json --broker-list "2,3,4" --generate Current partition replica assignment {"version":1,"partitions":[{"topic":"test1","partition":0,"replicas":[3,4],"log_dirs":["/tmp/kraft-broker-logs-1","any"]},{"topic":"test1","partition":1,"replicas":[4,2],"log_dirs":["any","/tmp/kraft-broker-logs"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"test1","partition":0,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"test1","partition":1,"replicas":[2,3],"log_dirs":["any","any"]}]} ``` Reviewers: PoAn Yang <[email protected]>, Andrew Schofield <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 4a57c6d commit c90e97a

File tree

3 files changed

+164
-52
lines changed

3 files changed

+164
-52
lines changed

tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java

Lines changed: 81 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -565,15 +565,16 @@ public static Entry<Map<TopicPartition, List<Integer>>, Map<TopicPartition, List
565565
List<Integer> brokersToReassign = t0.getKey();
566566
List<String> topicsToReassign = t0.getValue();
567567

568-
Map<TopicPartition, List<Integer>> currentAssignments = getReplicaAssignmentForTopics(adminClient, topicsToReassign);
568+
Map<TopicPartition, List<Node>> currentAssignments = getReplicaAssignmentForTopics(adminClient, topicsToReassign);
569569
Map<TopicPartitionReplica, String> currentReplicaLogDirs = getReplicaToLogDir(adminClient, currentAssignments);
570570
List<UsableBroker> usableBrokers = getBrokerMetadata(adminClient, brokersToReassign, enableRackAwareness);
571-
Map<TopicPartition, List<Integer>> proposedAssignments = calculateAssignment(currentAssignments, usableBrokers);
571+
Map<TopicPartition, List<Integer>> currentParts = toReplicaIds(currentAssignments);
572+
Map<TopicPartition, List<Integer>> proposedAssignments = calculateAssignment(currentParts, usableBrokers);
572573
System.out.printf("Current partition replica assignment%n%s%n%n",
573-
formatAsReassignmentJson(currentAssignments, currentReplicaLogDirs));
574+
formatAsReassignmentJson(currentParts, currentReplicaLogDirs));
574575
System.out.printf("Proposed partition reassignment configuration%n%s%n",
575576
formatAsReassignmentJson(proposedAssignments, Map.of()));
576-
return Map.entry(proposedAssignments, currentAssignments);
577+
return Map.entry(proposedAssignments, currentParts);
577578
}
578579

579580
/**
@@ -642,14 +643,14 @@ private static Map<String, TopicDescription> describeTopics(Admin adminClient,
642643
* @return A map from partitions to broker assignments.
643644
* If any topic can't be found, an exception will be thrown.
644645
*/
645-
static Map<TopicPartition, List<Integer>> getReplicaAssignmentForTopics(Admin adminClient,
646-
List<String> topics
646+
static Map<TopicPartition, List<Node>> getReplicaAssignmentForTopics(Admin adminClient,
647+
List<String> topics
647648
) throws ExecutionException, InterruptedException {
648-
Map<TopicPartition, List<Integer>> res = new HashMap<>();
649+
Map<TopicPartition, List<Node>> res = new HashMap<>();
649650
describeTopics(adminClient, new HashSet<>(topics)).forEach((topicName, topicDescription) ->
650651
topicDescription.partitions().forEach(info -> res.put(
651652
new TopicPartition(topicName, info.partition()),
652-
info.replicas().stream().map(Node::id).collect(Collectors.toList())
653+
info.replicas()
653654
)
654655
));
655656
return res;
@@ -663,15 +664,15 @@ static Map<TopicPartition, List<Integer>> getReplicaAssignmentForTopics(Admin ad
663664
* @return A map from partitions to broker assignments.
664665
* If any topic or partition can't be found, an exception will be thrown.
665666
*/
666-
static Map<TopicPartition, List<Integer>> getReplicaAssignmentForPartitions(Admin adminClient,
667-
Set<TopicPartition> partitions
667+
static Map<TopicPartition, List<Node>> getReplicasForPartitions(Admin adminClient,
668+
Set<TopicPartition> partitions
668669
) throws ExecutionException, InterruptedException {
669-
Map<TopicPartition, List<Integer>> res = new HashMap<>();
670+
Map<TopicPartition, List<Node>> res = new HashMap<>();
670671
describeTopics(adminClient, partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet())).forEach((topicName, topicDescription) ->
671672
topicDescription.partitions().forEach(info -> {
672673
TopicPartition tp = new TopicPartition(topicName, info.partition());
673674
if (partitions.contains(tp))
674-
res.put(tp, info.replicas().stream().map(Node::id).collect(Collectors.toList()));
675+
res.put(tp, info.replicas());
675676
})
676677
);
677678

@@ -684,6 +685,17 @@ static Map<TopicPartition, List<Integer>> getReplicaAssignmentForPartitions(Admi
684685
return res;
685686
}
686687

688+
static Map<TopicPartition, List<Integer>> toReplicaIds(
689+
Map<TopicPartition, List<Node>> replicaAssignmentForPartitions
690+
) {
691+
return replicaAssignmentForPartitions.entrySet()
692+
.stream()
693+
.collect(Collectors.toMap(
694+
Entry::getKey,
695+
e -> e.getValue().stream().map(Node::id).collect(Collectors.toList())
696+
));
697+
}
698+
687699
/**
688700
* Find the rack information for some brokers.
689701
*
@@ -775,8 +787,10 @@ public static void executeAssignment(Admin adminClient,
775787
proposedParts.values().forEach(brokers::addAll);
776788

777789
verifyBrokerIds(adminClient, brokers);
778-
Map<TopicPartition, List<Integer>> currentParts = getReplicaAssignmentForPartitions(adminClient, proposedParts.keySet());
779-
System.out.println(currentPartitionReplicaAssignmentToString(adminClient, proposedParts, currentParts));
790+
Map<TopicPartition, List<Node>> currentPartsToNode = getReplicasForPartitions(adminClient, proposedParts.keySet());
791+
Map<TopicPartition, List<Integer>> currentParts = toReplicaIds(currentPartsToNode);
792+
793+
System.out.println(currentPartitionReplicaAssignmentToString(adminClient, proposedParts, currentPartsToNode));
780794

781795
if (interBrokerThrottle >= 0 || logDirThrottle >= 0) {
782796
System.out.println(YOU_MUST_RUN_VERIFY_PERIODICALLY_MESSAGE);
@@ -919,21 +933,31 @@ private static void verifyBrokerIds(Admin adminClient, Set<Integer> brokers) thr
919933
*
920934
* @param adminClient The admin client object to use.
921935
* @param proposedParts The proposed partition assignment.
922-
* @param currentParts The current partition assignment.
936+
* @param currentAssignments The current partition assignment with Node information.
923937
*
924938
* @return The string to print. We will only print information about
925939
* partitions that appear in the proposed partition assignment.
926940
*/
927-
static String currentPartitionReplicaAssignmentToString(Admin adminClient,
928-
Map<TopicPartition, List<Integer>> proposedParts,
929-
Map<TopicPartition, List<Integer>> currentParts) throws JsonProcessingException, ExecutionException, InterruptedException {
930-
Map<TopicPartition, List<Integer>> partitionsToBeReassigned = currentParts.entrySet().stream()
941+
static String currentPartitionReplicaAssignmentToString(
942+
Admin adminClient,
943+
Map<TopicPartition, List<Integer>> proposedParts,
944+
Map<TopicPartition, List<Node>> currentAssignments
945+
) throws JsonProcessingException, ExecutionException, InterruptedException {
946+
947+
Map<TopicPartition, List<Node>> partitionsToBeReassigned = currentAssignments.entrySet()
948+
.stream()
931949
.filter(e -> proposedParts.containsKey(e.getKey()))
932950
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
933-
Map<TopicPartitionReplica, String> currentReplicaLogDirs = getReplicaToLogDir(adminClient, partitionsToBeReassigned);
951+
952+
Map<TopicPartitionReplica, String> currentReplicaLogDirs = getReplicaToLogDir(
953+
adminClient,
954+
partitionsToBeReassigned
955+
);
956+
957+
Map<TopicPartition, List<Integer>> currentParts = toReplicaIds(partitionsToBeReassigned);
934958

935959
return String.format("Current partition replica assignment%n%n%s%n%nSave this to use as the %s",
936-
formatAsReassignmentJson(partitionsToBeReassigned, currentReplicaLogDirs),
960+
formatAsReassignmentJson(currentParts, currentReplicaLogDirs),
937961
"--reassignment-json-file option during rollback");
938962
}
939963

@@ -1519,25 +1543,48 @@ static Set<TopicPartitionReplica> alterReplicaLogDirs(Admin adminClient,
15191543
return results;
15201544
}
15211545

1546+
/**
1547+
* Get the log directory for each replica.
1548+
*
1549+
* @param adminClient The admin client object to use.
1550+
* @param current The current partition assignment with Node information.
1551+
* @return Map of TopicPartitionReplica to log directory path.
1552+
*/
15221553
static Map<TopicPartitionReplica, String> getReplicaToLogDir(
15231554
Admin adminClient,
1524-
Map<TopicPartition, List<Integer>> topicPartitionToReplicas
1525-
) throws InterruptedException, ExecutionException {
1526-
var replicaLogDirs = topicPartitionToReplicas
1527-
.entrySet()
1528-
.stream()
1529-
.flatMap(entry -> entry.getValue()
1530-
.stream()
1531-
.map(id -> new TopicPartitionReplica(entry.getKey().topic(), entry.getKey().partition(), id)))
1532-
.collect(Collectors.toUnmodifiableSet());
1555+
Map<TopicPartition, List<Node>> current
1556+
) throws ExecutionException, InterruptedException {
1557+
List<TopicPartitionReplica> availableReplicas = available(current);
1558+
1559+
if (availableReplicas.isEmpty()) {
1560+
return Map.of();
1561+
}
15331562

1534-
return adminClient.describeReplicaLogDirs(replicaLogDirs).all().get()
1563+
return adminClient.describeReplicaLogDirs(availableReplicas).all().get()
15351564
.entrySet()
15361565
.stream()
1537-
.filter(entry -> entry.getValue().getCurrentReplicaLogDir() != null)
1566+
.filter(e -> e.getValue().getCurrentReplicaLogDir() != null)
15381567
.collect(Collectors.toMap(
15391568
Entry::getKey,
1540-
entry -> entry.getValue().getCurrentReplicaLogDir()
1541-
));
1569+
e -> e.getValue().getCurrentReplicaLogDir())
1570+
);
1571+
}
1572+
1573+
/**
1574+
* Extract available (non-empty) replicas from the assignment.
1575+
*/
1576+
private static List<TopicPartitionReplica> available(Map<TopicPartition, List<Node>> current) {
1577+
return current.entrySet()
1578+
.stream()
1579+
.flatMap(entry -> entry.getValue()
1580+
.stream()
1581+
.filter(node -> !node.isEmpty())
1582+
.map(node -> new TopicPartitionReplica(
1583+
entry.getKey().topic(),
1584+
entry.getKey().partition(),
1585+
node.id()
1586+
))
1587+
)
1588+
.toList();
15421589
}
15431590
}

tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,49 @@ public void testDisallowReplicationFactorChange() {
502502
}
503503
}
504504

505+
@ClusterTest(types = {Type.KRAFT})
506+
public void testGenerateAssignmentWithOneBootstrapServerShutdownWontTimeout() throws Exception {
507+
var brokerIdToShutdown = 0;
508+
createTopics();
509+
var foo0 = new TopicPartition("foo", 0);
510+
produceMessages(foo0.topic(), foo0.partition(), 100);
511+
512+
try (Admin admin = Admin.create(Map.of(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) {
513+
String topicsToMoveJson = """
514+
{
515+
"topics": [
516+
{ "topic": "foo" }
517+
],
518+
"version": 1
519+
}
520+
""";
521+
clusterInstance.shutdownBroker(brokerIdToShutdown);
522+
TestUtils.waitForCondition(
523+
() -> clusterInstance.aliveBrokers().size() == 4,
524+
"Waiting for broker to shutdown failed"
525+
);
526+
generateAssignment(admin, topicsToMoveJson, "1,2,3", false);
527+
}
528+
}
529+
530+
@ClusterTest(types = {Type.KRAFT})
531+
public void testExecuteAssignmentWithOneBootstrapServerShutdownWontTimeout() throws Exception {
532+
var brokerIdToShutdown = 0;
533+
createTopics();
534+
var foo0 = new TopicPartition("foo", 0);
535+
produceMessages(foo0.topic(), foo0.partition(), 100);
536+
clusterInstance.shutdownBroker(brokerIdToShutdown);
537+
TestUtils.waitForCondition(
538+
() -> clusterInstance.aliveBrokers().size() == 4,
539+
"Waiting for broker to shutdown failed"
540+
);
541+
// Execute the assignment
542+
String assignment = "{\"version\":1,\"partitions\":" +
543+
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[3,1,2],\"log_dirs\":[\"any\",\"any\",\"any\"]}" +
544+
"]}";
545+
runExecuteAssignment(false, assignment, -1L, -1L);
546+
}
547+
505548
private void createTopics() {
506549
try (Admin admin = Admin.create(Map.of(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) {
507550
Map<Integer, List<Integer>> fooReplicasAssignments = new HashMap<>();

tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@
6969
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.findPartitionReassignmentStates;
7070
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.generateAssignment;
7171
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getBrokerMetadata;
72-
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getReplicaAssignmentForPartitions;
7372
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getReplicaAssignmentForTopics;
7473
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getReplicaToLogDir;
7574
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.modifyInterBrokerThrottle;
@@ -79,6 +78,7 @@
7978
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.parseGenerateAssignmentArgs;
8079
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.partitionReassignmentStatesToString;
8180
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.replicaMoveStatesToString;
81+
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.toReplicaIds;
8282
import static org.junit.jupiter.api.Assertions.assertEquals;
8383
import static org.junit.jupiter.api.Assertions.assertFalse;
8484
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -290,20 +290,30 @@ public void testGetReplicaAssignments() throws Exception {
290290
assignments.put(new TopicPartition("foo", 0), List.of(0, 1, 2));
291291
assignments.put(new TopicPartition("foo", 1), List.of(1, 2, 3));
292292

293-
assertEquals(assignments, getReplicaAssignmentForTopics(adminClient, List.of("foo")));
293+
assertEquals(
294+
assignments,
295+
toReplicaIds(getReplicaAssignmentForTopics(adminClient, List.of("foo")))
296+
);
294297

295298
assignments.clear();
296299

297300
assignments.put(new TopicPartition("foo", 0), List.of(0, 1, 2));
298301
assignments.put(new TopicPartition("bar", 0), List.of(2, 3, 0));
299302

300-
assertEquals(assignments,
301-
getReplicaAssignmentForPartitions(adminClient, Set.of(new TopicPartition("foo", 0), new TopicPartition("bar", 0))));
303+
Map<TopicPartition, List<Integer>> actualAssignments = toReplicaIds(
304+
ReassignPartitionsCommand.getReplicasForPartitions(
305+
adminClient,
306+
Set.of(new TopicPartition("foo", 0), new TopicPartition("bar", 0))
307+
));
308+
assertEquals(
309+
assignments,
310+
actualAssignments
311+
);
302312

303313
UnknownTopicOrPartitionException exception =
304314
assertInstanceOf(UnknownTopicOrPartitionException.class,
305315
assertThrows(ExecutionException.class,
306-
() -> getReplicaAssignmentForPartitions(adminClient,
316+
() -> ReassignPartitionsCommand.getReplicasForPartitions(adminClient,
307317
Set.of(new TopicPartition("foo", 0), new TopicPartition("foo", 10)))).getCause());
308318
assertEquals("Unable to find partition: foo-10", exception.getMessage());
309319
}
@@ -451,25 +461,31 @@ public void testCurrentPartitionReplicaAssignmentToString() throws Exception {
451461
) {
452462

453463
List<Node> brokers = adminClient.brokers();
464+
Node broker1 = brokers.get(1);
465+
Node broker2 = brokers.get(2);
466+
Node broker3 = brokers.get(3);
467+
Node broker4 = brokers.get(4);
468+
Node broker5 = brokers.get(5);
469+
454470
adminClient.addTopic(false, "foo", List.of(
455-
new TopicPartitionInfo(1, brokers.get(1),
456-
List.of(brokers.get(1), brokers.get(2), brokers.get(3)),
457-
List.of(brokers.get(1), brokers.get(2), brokers.get(3)))
471+
new TopicPartitionInfo(1, broker1,
472+
List.of(broker1, broker2, broker3),
473+
List.of(broker1, broker2, broker3))
458474
), Map.of());
459475

460476
adminClient.addTopic(false, "bar", List.of(
461-
new TopicPartitionInfo(0, brokers.get(4),
462-
List.of(brokers.get(4), brokers.get(5)),
463-
List.of(brokers.get(4), brokers.get(5)))
477+
new TopicPartitionInfo(0, broker4,
478+
List.of(broker4, broker5),
479+
List.of(broker4, broker5))
464480
), Map.of());
465481

466482
Map<TopicPartition, List<Integer>> proposedParts = new HashMap<>();
467483
proposedParts.put(new TopicPartition("foo", 1), List.of(0, 1, 2));
468484
proposedParts.put(new TopicPartition("bar", 0), List.of(3, 4, 5));
469485

470-
Map<TopicPartition, List<Integer>> currentParts = new HashMap<>();
471-
currentParts.put(new TopicPartition("foo", 1), List.of(1, 2, 3));
472-
currentParts.put(new TopicPartition("bar", 0), List.of(4, 5));
486+
Map<TopicPartition, List<Node>> currentParts = new HashMap<>();
487+
currentParts.put(new TopicPartition("foo", 1), List.of(broker1, broker2, broker3));
488+
currentParts.put(new TopicPartition("bar", 0), List.of(broker4, broker5));
473489

474490
assertEquals(String.join(System.lineSeparator(),
475491
"Current partition replica assignment",
@@ -801,10 +817,16 @@ public void testGetReplicaToLogDir() throws Exception {
801817
) {
802818
addTopics(adminClient);
803819

804-
Map<TopicPartition, List<Integer>> topicPartitionToReplicas = Map.of(
805-
new TopicPartition("foo", 0), List.of(0, 1, 2),
806-
new TopicPartition("foo", 1), List.of(1, 2, 3),
807-
new TopicPartition("bar", 0), List.of(2, 3, 0)
820+
List<Node> brokers = adminClient.brokers();
821+
Node broker0 = brokers.get(0);
822+
Node broker1 = brokers.get(1);
823+
Node broker2 = brokers.get(2);
824+
Node broker3 = brokers.get(3);
825+
826+
Map<TopicPartition, List<Node>> topicPartitionToReplicas = Map.of(
827+
new TopicPartition("foo", 0), List.of(broker0, broker1, broker2),
828+
new TopicPartition("foo", 1), List.of(broker1, broker2, broker3),
829+
new TopicPartition("bar", 0), List.of(broker2, broker3, broker0)
808830
);
809831

810832
Map<TopicPartitionReplica, String> result = getReplicaToLogDir(adminClient, topicPartitionToReplicas);

0 commit comments

Comments
 (0)