From 53209738cba1393af1e928b200d709d5b89aec73 Mon Sep 17 00:00:00 2001 From: German Osin Date: Fri, 6 Jun 2025 10:45:18 +0200 Subject: [PATCH 1/4] Fixes #1117: Removing offline nodes from target calculation --- .../io/kafbat/ui/service/TopicsService.java | 31 +++++++++++++++---- 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/service/TopicsService.java b/api/src/main/java/io/kafbat/ui/service/TopicsService.java index 0cdae7891..6b5048018 100644 --- a/api/src/main/java/io/kafbat/ui/service/TopicsService.java +++ b/api/src/main/java/io/kafbat/ui/service/TopicsService.java @@ -26,6 +26,7 @@ import io.kafbat.ui.model.TopicCreationDTO; import io.kafbat.ui.model.TopicUpdateDTO; import java.time.Duration; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -288,6 +289,18 @@ private Map> getPartitionsRea Map brokersUsage = getBrokersMap(cluster, currentAssignment); int currentReplicationFactor = topic.getReplicationFactor(); + // Get online nodes + List onlineNodes = statisticsCache.get(cluster).getClusterDescription().getNodes() + .stream().map(Node::id).toList(); + + // keep only online nodes + for (Map.Entry> parition : currentAssignment.entrySet()) { + parition.getValue().retainAll(onlineNodes); + } + + brokersUsage.keySet().retainAll(onlineNodes); + + // If we should to increase Replication factor if (replicationFactorChange.getTotalReplicationFactor() > currentReplicationFactor) { // For each partition @@ -320,6 +333,11 @@ private Map> getPartitionsRea var partition = assignmentEntry.getKey(); var brokers = assignmentEntry.getValue(); + // Copy frpm online nodes if all nodes are offline + if (brokers.isEmpty()) { + brokers = new ArrayList<>(onlineNodes); + } + // Get brokers list sorted by usage in reverse order var brokersUsageList = brokersUsage.entrySet().stream() .sorted(Map.Entry.comparingByValue(Comparator.reverseOrder())) @@ -329,19 +347,20 @@ private Map> getPartitionsRea // Iterate brokers and try to remove them from assignment // while partition replicas count != requested replication factor for (Integer broker : brokersUsageList) { + if (brokers.size() == replicationFactorChange.getTotalReplicationFactor()) { + break; + } // Check is the broker the leader of partition - if (!topic.getPartitions().get(partition).getLeader() - .equals(broker)) { + Integer leader = topic.getPartitions().get(partition).getLeader(); + if (leader != null && !leader.equals(broker)) { brokers.remove(broker); brokersUsage.merge(broker, -1, Integer::sum); } - if (brokers.size() == replicationFactorChange.getTotalReplicationFactor()) { - break; - } } if (brokers.size() != replicationFactorChange.getTotalReplicationFactor()) { throw new ValidationException("Something went wrong during removing replicas"); } + currentAssignment.put(partition, brokers); } } else { throw new ValidationException("Replication factor already equals requested"); @@ -374,7 +393,7 @@ private Map getBrokersMap(KafkaCluster cluster, c -> 0 )); currentAssignment.values().forEach(brokers -> brokers - .forEach(broker -> result.put(broker, result.get(broker) + 1))); + .forEach(broker -> result.put(broker, result.getOrDefault(broker, 0) + 1))); return result; } From a503a61d76d3cc35b1f7fc7cefd4ba541b6e1486 Mon Sep 17 00:00:00 2001 From: German Osin Date: Fri, 6 Jun 2025 10:46:52 +0200 Subject: [PATCH 2/4] Fixed typo --- api/src/main/java/io/kafbat/ui/service/TopicsService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/src/main/java/io/kafbat/ui/service/TopicsService.java b/api/src/main/java/io/kafbat/ui/service/TopicsService.java index 6b5048018..3b486318f 100644 --- a/api/src/main/java/io/kafbat/ui/service/TopicsService.java +++ b/api/src/main/java/io/kafbat/ui/service/TopicsService.java @@ -333,7 +333,7 @@ private Map> getPartitionsRea var partition = assignmentEntry.getKey(); var brokers = assignmentEntry.getValue(); - // Copy frpm online nodes if all nodes are offline + // Copy from online nodes if all nodes are offline if (brokers.isEmpty()) { brokers = new ArrayList<>(onlineNodes); } From 9d6e955f15151d3a14f863bd910337a83e1ae102 Mon Sep 17 00:00:00 2001 From: German Osin Date: Fri, 6 Jun 2025 11:08:42 +0200 Subject: [PATCH 3/4] Fixed non leader removals --- api/src/main/java/io/kafbat/ui/service/TopicsService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/src/main/java/io/kafbat/ui/service/TopicsService.java b/api/src/main/java/io/kafbat/ui/service/TopicsService.java index 3b486318f..cb8e30a73 100644 --- a/api/src/main/java/io/kafbat/ui/service/TopicsService.java +++ b/api/src/main/java/io/kafbat/ui/service/TopicsService.java @@ -352,7 +352,7 @@ private Map> getPartitionsRea } // Check is the broker the leader of partition Integer leader = topic.getPartitions().get(partition).getLeader(); - if (leader != null && !leader.equals(broker)) { + if (leader == null || !leader.equals(broker)) { brokers.remove(broker); brokersUsage.merge(broker, -1, Integer::sum); } From 3ee6a5bedc9dfd05ce85fdb97b4f7da767400350 Mon Sep 17 00:00:00 2001 From: German Osin Date: Fri, 6 Jun 2025 11:09:56 +0200 Subject: [PATCH 4/4] Small optimization --- api/src/main/java/io/kafbat/ui/service/TopicsService.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/api/src/main/java/io/kafbat/ui/service/TopicsService.java b/api/src/main/java/io/kafbat/ui/service/TopicsService.java index cb8e30a73..04dd2d593 100644 --- a/api/src/main/java/io/kafbat/ui/service/TopicsService.java +++ b/api/src/main/java/io/kafbat/ui/service/TopicsService.java @@ -344,6 +344,8 @@ private Map> getPartitionsRea .map(Map.Entry::getKey) .toList(); + Integer leader = topic.getPartitions().get(partition).getLeader(); + // Iterate brokers and try to remove them from assignment // while partition replicas count != requested replication factor for (Integer broker : brokersUsageList) { @@ -351,7 +353,6 @@ private Map> getPartitionsRea break; } // Check is the broker the leader of partition - Integer leader = topic.getPartitions().get(partition).getLeader(); if (leader == null || !leader.equals(broker)) { brokers.remove(broker); brokersUsage.merge(broker, -1, Integer::sum);