2626import java .util .UUID ;
2727import java .util .function .Predicate ;
2828import java .util .stream .Collectors ;
29+ import java .util .stream .Stream ;
2930import org .apache .ignite .IgniteCache ;
3031import org .apache .ignite .IgniteCheckedException ;
3132import org .apache .ignite .IgniteSystemProperties ;
3233import org .apache .ignite .cache .affinity .Affinity ;
3334import org .apache .ignite .cluster .ClusterNode ;
35+ import org .apache .ignite .configuration .DataRegionConfiguration ;
36+ import org .apache .ignite .configuration .DataStorageConfiguration ;
3437import org .apache .ignite .configuration .IgniteConfiguration ;
3538import org .apache .ignite .internal .IgniteEx ;
3639import org .apache .ignite .lang .IgniteBiPredicate ;
3740import org .apache .ignite .testframework .junits .common .GridCommonAbstractTest ;
3841import org .junit .Test ;
3942
43+ import static org .apache .ignite .cluster .ClusterState .ACTIVE ;
4044import static org .apache .ignite .testframework .GridTestUtils .assertThrows ;
4145
4246/**
@@ -54,7 +58,10 @@ public class MdcAffinityBackupFilterSelfTest extends GridCommonAbstractTest {
5458 private String [] dcIds ;
5559
5660 /** */
57- private IgniteBiPredicate <ClusterNode , List <ClusterNode >> filter ;
61+ private IgniteBiPredicate <ClusterNode , List <ClusterNode >> backupFilter ;
62+
63+ /** */
64+ private boolean persistenceEnabled ;
5865
5966 /** {@inheritDoc} */
6067 @ Override protected IgniteConfiguration getConfiguration (String igniteInstanceName ) throws Exception {
@@ -64,7 +71,17 @@ public class MdcAffinityBackupFilterSelfTest extends GridCommonAbstractTest {
6471 .setBackups (backups )
6572 .setAffinity (
6673 new RendezvousAffinityFunction (false , PARTS_CNT )
67- .setAffinityBackupFilter (filter )));
74+ .setAffinityBackupFilter (backupFilter )));
75+
76+ if (persistenceEnabled ) {
77+ cfg .setDataStorageConfiguration (
78+ new DataStorageConfiguration ()
79+ .setDefaultDataRegionConfiguration (
80+ new DataRegionConfiguration ()
81+ .setPersistenceEnabled (true )
82+ )
83+ );
84+ }
6885
6986 return cfg ;
7087 }
@@ -74,6 +91,8 @@ public class MdcAffinityBackupFilterSelfTest extends GridCommonAbstractTest {
7491 super .afterTest ();
7592
7693 stopAllGrids ();
94+
95+ cleanPersistenceDir ();
7796 }
7897
7998 /**
@@ -103,6 +122,61 @@ public void testUniformNumberOfPartitionCopiesPerDcIsEnforced() {
103122 "recommended values are 5 and 8" );
104123 }
105124
125+ /**
126+ * Verifies that filter doesn't assing additional copies if baseline topology is not changed,
127+ * and it could lead to breaking of the guarantee `at least one copy of partition per datacenter`.
128+ * <p/>
129+ * Verifies that after baseline topology change, guarantee is restored.
130+ *
131+ * @throws Exception If failed.
132+ */
133+ @ Test
134+ public void testMdcFilterWithBaselineTopology () throws Exception {
135+ persistenceEnabled = true ;
136+ dcIds = new String [] {"DC_0" , "DC_1" };
137+ int nodesPerDc = 2 ;
138+ backups = 1 ;
139+ backupFilter = new MdcAffinityBackupFilter (dcIds .length , backups );
140+
141+ IgniteEx srv = startClusterAcrossDataCenters (dcIds , nodesPerDc );
142+
143+ srv .cluster ().state (ACTIVE );
144+
145+ IgniteCache cache = srv .getOrCreateCache (DEFAULT_CACHE_NAME );
146+
147+ Map <Integer , Set <UUID >> dc1OldAffDistr = affinityForPartitions (cache , node -> node .dataCenterId ().equals (dcIds [1 ]));
148+
149+ UUID srv0Id = grid (0 ).localNode ().id ();
150+
151+ Map <Integer , Set <UUID >> dc0AffDistr = affinityForPartitions (cache , node -> node .dataCenterId ().equals (dcIds [0 ]));
152+
153+ Set <Integer > srv0Partitions = dc0AffDistr
154+ .entrySet ().stream ()
155+ .filter (entry -> entry .getValue ().contains (srv0Id ))
156+ .map (Map .Entry ::getKey )
157+ .collect (Collectors .toSet ());
158+
159+ stopGrid (0 );
160+
161+ dc0AffDistr = affinityForPartitions (cache , node -> node .dataCenterId ().equals (dcIds [0 ]));
162+
163+ // Partitions from stopped node are not assigned to any node without baseline topology change.
164+ assertFalse (dc0AffDistr .keySet ().containsAll (srv0Partitions ));
165+
166+ Map <Integer , Set <UUID >> dc1NewAffDistr = affinityForPartitions (cache , node -> node .dataCenterId ().equals (dcIds [1 ]));
167+
168+ // Assignment of partitions in remote DC has not changed.
169+ assertEquals (dc1OldAffDistr , dc1NewAffDistr );
170+
171+ srv .cluster ().setBaselineTopology (srv .cluster ().topologyVersion ());
172+ awaitPartitionMapExchange ();
173+
174+ Set <Integer > allParts = Stream .iterate (0 , i -> i + 1 ).limit (PARTS_CNT ).collect (Collectors .toSet ());
175+ dc0AffDistr = affinityForPartitions (cache , node -> node .dataCenterId ().equals (dcIds [0 ]));
176+ // After baseline topology change, guarantee is restored.
177+ assertTrue (allParts .containsAll (dc0AffDistr .keySet ()));
178+ }
179+
106180 /**
107181 * Verifies that partition copies are assigned evenly across a cluster in two data centers.
108182 * <p/>
@@ -115,25 +189,25 @@ public void test2DcDistribution() throws Exception {
115189 dcIds = new String [] {"DC_0" , "DC_1" };
116190 int nodesPerDc = 4 ;
117191 backups = 3 ;
118- filter = new MdcAffinityBackupFilter (dcIds .length , backups );
192+ backupFilter = new MdcAffinityBackupFilter (dcIds .length , backups );
119193
120194 IgniteEx srv = startClusterAcrossDataCenters (dcIds , nodesPerDc );
121195
122- verifyDistributionProperties (srv , dcIds , nodesPerDc , backups );
196+ verifyDistributionGuarantees (srv , dcIds , nodesPerDc , backups );
123197
124198 //stopping one node in DC_1 should not compromise distribution as there are additional nodes in the same DC
125199 stopGrid (5 );
126200
127201 awaitPartitionMapExchange ();
128202
129- verifyDistributionProperties (srv , dcIds , nodesPerDc , backups );
203+ verifyDistributionGuarantees (srv , dcIds , nodesPerDc , backups );
130204
131205 //stopping another node in DC_1 should not compromise distribution as well
132206 stopGrid (6 );
133207
134208 awaitPartitionMapExchange ();
135209
136- verifyDistributionProperties (srv , dcIds , nodesPerDc , backups );
210+ verifyDistributionGuarantees (srv , dcIds , nodesPerDc , backups );
137211 }
138212
139213 /**
@@ -146,11 +220,11 @@ public void test3DcDistribution() throws Exception {
146220 dcIds = new String [] {"DC_0" , "DC_1" , "DC_2" };
147221 int nodesPerDc = 2 ;
148222 backups = 5 ;
149- filter = new MdcAffinityBackupFilter (dcIds .length , backups );
223+ backupFilter = new MdcAffinityBackupFilter (dcIds .length , backups );
150224
151225 IgniteEx srv = startClusterAcrossDataCenters (dcIds , 2 );
152226
153- verifyDistributionProperties (srv , dcIds , nodesPerDc , backups );
227+ verifyDistributionGuarantees (srv , dcIds , nodesPerDc , backups );
154228 }
155229
156230 /**
@@ -163,10 +237,10 @@ public void test3DcDistribution() throws Exception {
163237 public void testAffinityFilterConfigurationValidation () throws Exception {
164238 dcIds = new String [] {"DC_0" , "DC_1" };
165239 backups = 3 ;
166- filter = new MdcAffinityBackupFilter (dcIds .length , backups );
240+ backupFilter = new MdcAffinityBackupFilter (dcIds .length , backups );
167241 startGrid (0 );
168242
169- filter = new ClusterNodeAttributeAffinityBackupFilter ("DC_ID" );
243+ backupFilter = new ClusterNodeAttributeAffinityBackupFilter ("DC_ID" );
170244 try {
171245 startGrid (1 );
172246
@@ -180,7 +254,7 @@ public void testAffinityFilterConfigurationValidation() throws Exception {
180254 assertTrue (errMsg .contains ("Affinity backup filter class mismatch" ));
181255 }
182256
183- filter = new MdcAffinityBackupFilter (dcIds .length , backups + dcIds .length );
257+ backupFilter = new MdcAffinityBackupFilter (dcIds .length , backups + dcIds .length );
184258 try {
185259 startGrid (1 );
186260
@@ -207,15 +281,21 @@ public void testNoRebalanceInOneDcIfTopologyChangesInAnother() throws Exception
207281 dcIds = new String [] {"DC_0" , "DC_1" };
208282 int nodesPerDc = 3 ;
209283 backups = 3 ;
210- filter = new MdcAffinityBackupFilter (dcIds .length , backups );
211- Predicate <ClusterNode > dc1NodesFilter = node -> "DC_1" .equals (node .dataCenterId ());
284+ backupFilter = new MdcAffinityBackupFilter (dcIds .length , backups );
285+ Predicate <ClusterNode > dc1NodesFilter = node -> dcIds [ 1 ] .equals (node .dataCenterId ());
212286
213287 IgniteEx srv = startClusterAcrossDataCenters (dcIds , nodesPerDc );
214288 awaitPartitionMapExchange ();
215289 Map <Integer , Set <UUID >> oldDistribution = affinityForPartitions (srv .getOrCreateCache (DEFAULT_CACHE_NAME ), dc1NodesFilter );
216290
217- for (int srvIdx = 0 ; srvIdx < 3 ; srvIdx ++)
218- oldDistribution = verifyNoRebalancing (srvIdx , srv , oldDistribution , dc1NodesFilter );
291+ oldDistribution = verifyNoRebalancing (0 , true , null , srv , oldDistribution , dc1NodesFilter );
292+ oldDistribution = verifyNoRebalancing (1 , true , null , srv , oldDistribution , dc1NodesFilter );
293+ // Start srv0 back in DC_0 to make sure that neither starting nor stopping a server doesn't affect
294+ // affinity distribution in remote DC.
295+ oldDistribution = verifyNoRebalancing (0 , false , dcIds [0 ], srv , oldDistribution , dc1NodesFilter );
296+ // Stop it again.
297+ oldDistribution = verifyNoRebalancing (0 , true , null , srv , oldDistribution , dc1NodesFilter );
298+ verifyNoRebalancing (2 , true , null , srv , oldDistribution , dc1NodesFilter );
219299 }
220300
221301 /** Starts specified number of nodes in each DC. */
@@ -236,11 +316,18 @@ private IgniteEx startClusterAcrossDataCenters(String[] dcIds, int nodesPerDc) t
236316 /** */
237317 private Map <Integer , Set <UUID >> verifyNoRebalancing (
238318 int srvIdx ,
319+ boolean stopSrv ,
320+ String startingSrvDcId ,
239321 IgniteEx srv ,
240322 Map <Integer , Set <UUID >> oldDistribution ,
241323 Predicate <ClusterNode > dc1NodesFilter
242- ) throws InterruptedException {
243- stopGrid (srvIdx );
324+ ) throws Exception {
325+ if (stopSrv )
326+ stopGrid (srvIdx );
327+ else {
328+ System .setProperty (IgniteSystemProperties .IGNITE_DATA_CENTER_ID , startingSrvDcId );
329+ startGrid (srvIdx );
330+ }
244331 awaitPartitionMapExchange ();
245332 IgniteCache <Integer , Integer > cache = srv .getOrCreateCache (DEFAULT_CACHE_NAME );
246333
@@ -287,7 +374,7 @@ private Map<Integer, Set<UUID>> affinityForPartitions(IgniteCache<Integer, Integ
287374 /**
288375 * Checks that copies of each partition are distributed evenly across data centers and copies are spread evenly across nodes.
289376 */
290- private void verifyDistributionProperties (
377+ private void verifyDistributionGuarantees (
291378 IgniteEx srv ,
292379 String [] dcIds ,
293380 int nodesPerDc ,
0 commit comments