|
20 | 20 | */ |
21 | 21 |
|
22 | 22 | import java.util.ArrayList; |
23 | | -import java.util.Collections; |
24 | 23 | import java.util.Comparator; |
25 | 24 | import java.util.HashMap; |
26 | 25 | import java.util.HashSet; |
27 | | -import java.util.Iterator; |
28 | 26 | import java.util.List; |
29 | 27 | import java.util.Map; |
30 | 28 | import java.util.PriorityQueue; |
31 | 29 | import java.util.Queue; |
32 | 30 | import java.util.Set; |
33 | | -import java.util.TreeSet; |
34 | | -import java.util.stream.Collectors; |
35 | | - |
36 | | -import org.apache.commons.math3.util.Pair; |
37 | 31 |
|
38 | 32 | import static org.apache.helix.util.VirtualTopologyUtil.computeVirtualGroupId; |
39 | 33 |
|
@@ -128,35 +122,23 @@ private void distributeUnassignedZones( |
128 | 122 |
|
129 | 123 | // Priority queue sorted by current load of the virtual group |
130 | 124 | // We always assign new zones to the group with the smallest load to keep them balanced. |
131 | | - Queue<String> minHeap = new PriorityQueue<>((o1, o2) -> { |
132 | | - int load1 = computeGroupLoad(virtualGroupToZoneMapping, o1, zoneMapping); |
133 | | - int load2 = computeGroupLoad(virtualGroupToZoneMapping, o2, zoneMapping); |
134 | | - |
135 | | - int loadDiff = Integer.compare(load1, load2); |
136 | | - |
137 | | - // If the loads are not equal, return the difference |
138 | | - if (loadDiff != 0) { |
139 | | - return loadDiff; |
140 | | - } |
141 | | - |
142 | | - // When loads are equal, sort by group name to ensure consistent ordering |
143 | | - return o1.compareTo(o2); |
144 | | - }); |
| 125 | + // If loads are equal, sort by zone name to ensure consistent ordering |
| 126 | + Queue<String> minHeap = new PriorityQueue<>( |
| 127 | + Comparator.<String>comparingInt(vg -> |
| 128 | + virtualGroupToZoneMapping.get(vg).stream() |
| 129 | + .map(zoneMapping::get) |
| 130 | + .mapToInt(Set::size) |
| 131 | + .sum() |
| 132 | + ).thenComparing(String::compareTo) |
| 133 | + ); |
145 | 134 | // Seed the min-heap with existing groups |
146 | 135 | minHeap.addAll(virtualGroupToZoneMapping.keySet()); |
147 | 136 |
|
148 | 137 | // Sort unassigned zones by descending number of unassigned instances, assigning "heavier" zones first. |
149 | | - unassignedZones.sort((o1, o2) -> { |
150 | | - int zone1Size = zoneMapping.get(o1).size(); |
151 | | - int zone2Size = zoneMapping.get(o2).size(); |
152 | | - |
153 | | - if (zone1Size != zone2Size) { |
154 | | - return Integer.compare(zone2Size, zone1Size); // Sort by size descending |
155 | | - } |
156 | | - |
157 | | - // If sizes are equal, sort by zone name to ensure consistent ordering |
158 | | - return o1.compareTo(o2); |
159 | | - }); |
| 138 | + // If sizes are equal, sort by zone name to ensure consistent ordering |
| 139 | + unassignedZones.sort(Comparator.<String>comparingInt(zone -> zoneMapping.get(zone).size()) |
| 140 | + .reversed() |
| 141 | + .thenComparing(String::compareTo)); |
160 | 142 |
|
161 | 143 | // Assign each zone to the least-loaded group |
162 | 144 | for (String zone : unassignedZones) { |
@@ -185,21 +167,4 @@ private Map<String, Set<String>> constructResult(Map<String, Set<String>> vgToZo |
185 | 167 | } |
186 | 168 | return result; |
187 | 169 | } |
188 | | - |
189 | | - /** |
190 | | - * Computes the load of a virtual group based on the number of instances across all zones in that group. |
191 | | - * |
192 | | - * @param virtualGroupToZoneMapping Mapping of virtual group -> set of zones. |
193 | | - * @param group The virtual group for which to compute the load. |
194 | | - * @param zoneMapping Mapping of zone -> set of instances. |
195 | | - * @return The total number of instances across all zones in the specified virtual group. |
196 | | - */ |
197 | | - private int computeGroupLoad( |
198 | | - Map<String, Set<String>> virtualGroupToZoneMapping, String group, |
199 | | - Map<String, Set<String>> zoneMapping) { |
200 | | - // The load of a group is defined as the total number of instances across all zones in that group. |
201 | | - return virtualGroupToZoneMapping.getOrDefault(group, Collections.emptySet()).stream() |
202 | | - .mapToInt(zone -> zoneMapping.getOrDefault(zone, Collections.emptySet()).size()) |
203 | | - .sum(); |
204 | | - } |
205 | 170 | } |
0 commit comments