2626import io .kafbat .ui .model .TopicCreationDTO ;
2727import io .kafbat .ui .model .TopicUpdateDTO ;
2828import java .time .Duration ;
29+ import java .util .ArrayList ;
2930import java .util .Collection ;
3031import java .util .Collections ;
3132import java .util .Comparator ;
@@ -288,6 +289,18 @@ private Map<TopicPartition, Optional<NewPartitionReassignment>> getPartitionsRea
288289 Map <Integer , Integer > brokersUsage = getBrokersMap (cluster , currentAssignment );
289290 int currentReplicationFactor = topic .getReplicationFactor ();
290291
292+ // Get online nodes
293+ List <Integer > onlineNodes = statisticsCache .get (cluster ).getClusterDescription ().getNodes ()
294+ .stream ().map (Node ::id ).toList ();
295+
296+ // keep only online nodes
297+ for (Map .Entry <Integer , List <Integer >> parition : currentAssignment .entrySet ()) {
298+ parition .getValue ().retainAll (onlineNodes );
299+ }
300+
301+ brokersUsage .keySet ().retainAll (onlineNodes );
302+
303+
291304 // If we should to increase Replication factor
292305 if (replicationFactorChange .getTotalReplicationFactor () > currentReplicationFactor ) {
293306 // For each partition
@@ -320,6 +333,11 @@ private Map<TopicPartition, Optional<NewPartitionReassignment>> getPartitionsRea
320333 var partition = assignmentEntry .getKey ();
321334 var brokers = assignmentEntry .getValue ();
322335
336+ // Copy frpm online nodes if all nodes are offline
337+ if (brokers .isEmpty ()) {
338+ brokers = new ArrayList <>(onlineNodes );
339+ }
340+
323341 // Get brokers list sorted by usage in reverse order
324342 var brokersUsageList = brokersUsage .entrySet ().stream ()
325343 .sorted (Map .Entry .comparingByValue (Comparator .reverseOrder ()))
@@ -329,19 +347,20 @@ private Map<TopicPartition, Optional<NewPartitionReassignment>> getPartitionsRea
329347 // Iterate brokers and try to remove them from assignment
330348 // while partition replicas count != requested replication factor
331349 for (Integer broker : brokersUsageList ) {
350+ if (brokers .size () == replicationFactorChange .getTotalReplicationFactor ()) {
351+ break ;
352+ }
332353 // Check is the broker the leader of partition
333- if (! topic .getPartitions ().get (partition ).getLeader ()
334- .equals (broker )) {
354+ Integer leader = topic .getPartitions ().get (partition ).getLeader ();
355+ if ( leader != null && ! leader .equals (broker )) {
335356 brokers .remove (broker );
336357 brokersUsage .merge (broker , -1 , Integer ::sum );
337358 }
338- if (brokers .size () == replicationFactorChange .getTotalReplicationFactor ()) {
339- break ;
340- }
341359 }
342360 if (brokers .size () != replicationFactorChange .getTotalReplicationFactor ()) {
343361 throw new ValidationException ("Something went wrong during removing replicas" );
344362 }
363+ currentAssignment .put (partition , brokers );
345364 }
346365 } else {
347366 throw new ValidationException ("Replication factor already equals requested" );
@@ -374,7 +393,7 @@ private Map<Integer, Integer> getBrokersMap(KafkaCluster cluster,
374393 c -> 0
375394 ));
376395 currentAssignment .values ().forEach (brokers -> brokers
377- .forEach (broker -> result .put (broker , result .get (broker ) + 1 )));
396+ .forEach (broker -> result .put (broker , result .getOrDefault (broker , 0 ) + 1 )));
378397
379398 return result ;
380399 }
0 commit comments