1919
2020import java .util .Collection ;
2121import java .util .HashMap ;
22+ import java .util .HashSet ;
2223import java .util .List ;
2324import java .util .Map ;
25+ import java .util .Set ;
26+ import java .util .UUID ;
27+ import java .util .function .Predicate ;
2428import java .util .stream .Collectors ;
2529import org .apache .ignite .IgniteCache ;
2630import org .apache .ignite .IgniteCheckedException ;
4145 */
4246public class MdcAffinityBackupFilterSelfTest extends GridCommonAbstractTest {
4347 /** */
44- private static final int PARTS_CNT = 1024 ;
48+ private static final int PARTS_CNT = 512 ;
4549
4650 /** */
4751 private int backups ;
@@ -189,6 +193,29 @@ public void testAffinityFilterConfigurationValidation() throws Exception {
189193 }
190194 }
191195
196+ /**
197+ * Verifies that distribution of partitions in one datacenter
198+ * doesn't change if nodes leave in another or if the other DC goes down completely.
199+ * In other words, no rebalance is triggered in one dc if some topology changes happen in another.
200+ *
201+ * @throws Exception If failed.
202+ */
203+ @ Test
204+ public void testNoRebalanceInOneDcIfTopologyChangesInAnother () throws Exception {
205+ dcIds = new String [] {"DC_0" , "DC_1" };
206+ int nodesPerDc = 3 ;
207+ backups = 3 ;
208+ filter = new MdcAffinityBackupFilter (dcIds .length , backups );
209+ Predicate <ClusterNode > dc1NodesFilter = node -> "DC_1" .equals (node .dataCenterId ());
210+
211+ IgniteEx srv = startClusterAcrossDataCenters (dcIds , nodesPerDc );
212+ awaitPartitionMapExchange ();
213+ Map <Integer , Set <UUID >> oldDistribution = affinityForPartitions (srv .getOrCreateCache (DEFAULT_CACHE_NAME ), dc1NodesFilter );
214+
215+ for (int srvIdx = 0 ; srvIdx < 3 ; srvIdx ++)
216+ oldDistribution = verifyNoRebalancing (srvIdx , srv , oldDistribution , dc1NodesFilter );
217+ }
218+
192219 /** Starts specified number of nodes in each DC. */
193220 private IgniteEx startClusterAcrossDataCenters (String [] dcIds , int nodesPerDc ) throws Exception {
194221 int nodeIdx = 0 ;
@@ -204,6 +231,51 @@ private IgniteEx startClusterAcrossDataCenters(String[] dcIds, int nodesPerDc) t
204231 return lastNode ;
205232 }
206233
234+ /** */
235+ private Map <Integer , Set <UUID >> verifyNoRebalancing (int srvIdx , IgniteEx srv , Map <Integer , Set <UUID >> oldDistribution , Predicate <ClusterNode > dc1NodesFilter ) throws InterruptedException {
236+ stopGrid (srvIdx );
237+ awaitPartitionMapExchange ();
238+ IgniteCache <Integer , Integer > cache = srv .getOrCreateCache (DEFAULT_CACHE_NAME );
239+
240+ Map <Integer , Set <UUID >> newDistribution = affinityForPartitions (cache , dc1NodesFilter );
241+
242+ assertEquals (
243+ String .format ("Affinity distribution changed after server node %d was stopped" , srvIdx ),
244+ oldDistribution ,
245+ newDistribution );
246+
247+ return newDistribution ;
248+ }
249+
250+ /** */
251+ private Map <Integer , Set <UUID >> affinityForPartitions (IgniteCache <Integer , Integer > cache , Predicate <ClusterNode > dcFilter ) {
252+ Map <Integer , Set <UUID >> result = new HashMap <>(PARTS_CNT );
253+ Affinity <Integer > aff = affinity (cache );
254+
255+ for (int i = 0 ; i < PARTS_CNT ; i ++) {
256+ int j = i ;
257+
258+ aff .mapKeyToPrimaryAndBackups (i )
259+ .stream ()
260+ .filter (dcFilter )
261+ .forEach (
262+ node -> result .compute (j ,
263+ (k , v ) -> {
264+ if (v == null ) {
265+ Set <UUID > s = new HashSet <>();
266+ s .add (node .id ());
267+ return s ;
268+ }
269+ else {
270+ v .add (node .id ());
271+ return v ;
272+ }
273+ }));
274+ }
275+
276+ return result ;
277+ }
278+
207279 /**
208280 * Checks that copies of each partition are distributed evenly across data centers and copies are spread evenly across nodes.
209281 */
0 commit comments