2424import java .util .List ;
2525import java .util .Set ;
2626import java .util .UUID ;
27+ import java .util .concurrent .ConcurrentHashMap ;
2728import java .util .concurrent .CopyOnWriteArrayList ;
2829import java .util .concurrent .atomic .AtomicBoolean ;
2930import java .util .concurrent .atomic .AtomicInteger ;
31+ import java .util .concurrent .atomic .AtomicReference ;
3032import java .util .function .Consumer ;
3133import java .util .function .Function ;
3234import java .util .stream .Collectors ;
7577 * Checks the service awareness feature of the thin client.
7678 */
7779public class ServiceAwarenessTest extends AbstractThinClientTest {
80+ /** */
81+ private static final String ATTR_NODE_IDX = "test.node.idx" ;
82+
7883 /** Node-filter service name. */
7984 private static final String SRV_NAME = "node_filtered_svc" ;
8085
8186 /** Number of grids at the test start. */
82- private static final int GRIDS = 4 ;
87+ private static final int BASE_NODES_CNT = 4 ;
88+
89+ /** */
90+ private static final int TOP_UPD_NODES_CNT = 3 ;
8391
8492 /** Number of node instances with the initial service deployment. */
8593 private static final int INIT_SRVC_NODES_CNT = 2 ;
@@ -95,6 +103,7 @@ public class ServiceAwarenessTest extends AbstractThinClientTest {
95103 IgniteConfiguration cfg = super .getConfiguration (igniteInstanceName );
96104
97105 cfg .setDiscoverySpi (new TestBlockingDiscoverySpi ());
106+ cfg .setUserAttributes (Collections .singletonMap (ATTR_NODE_IDX , getTestIgniteInstanceIndex (igniteInstanceName )));
98107
99108 return cfg ;
100109 }
@@ -147,7 +156,7 @@ private static ServiceConfiguration serviceCfg() {
147156 @ Override protected void beforeTest () throws Exception {
148157 super .beforeTest ();
149158
150- startGrids (GRIDS );
159+ startGrids (BASE_NODES_CNT );
151160
152161 grid (1 ).services ().deploy (serviceCfg ());
153162 }
@@ -160,7 +169,7 @@ public void testDelayedServiceRedeploy() throws Exception {
160169 // Service topology on the client.
161170 Set <UUID > srvcTopOnClient = new GridConcurrentHashSet <>();
162171
163- addSrvcTopUpdateClientLogLsnr (srvcTopOnClient ::addAll );
172+ registerServiceTopologyUpdateListener (srvcTopOnClient ::addAll );
164173
165174 AtomicBoolean svcRunFlag = new AtomicBoolean (true );
166175
@@ -180,13 +189,13 @@ public void testDelayedServiceRedeploy() throws Exception {
180189 // Delays service redeployment and the service topology update on the server side.
181190 testDisco .toBlock .add (ServiceClusterDeploymentResultBatch .class );
182191
183- startGrid (GRIDS );
192+ startGrid (BASE_NODES_CNT );
184193
185194 waitForCondition (() -> testDisco .blocked .size () == 1 , getTestTimeout ());
186195
187196 // Ensure all the nodes have started but the service topology hasn't updated yet.
188197 for (Ignite ig : G .allGrids ()) {
189- assertEquals (ig .cluster ().nodes ().size (), GRIDS + 1 );
198+ assertEquals (ig .cluster ().nodes ().size (), BASE_NODES_CNT + 1 );
190199
191200 // Ensure there are still SRVC_FILTERED_NOIDES_CNT nodes with the service instance.
192201 assertEquals (((IgniteEx )ig ).context ().service ().serviceTopology (SRV_NAME , 0 ).size (),
@@ -195,7 +204,7 @@ public void testDelayedServiceRedeploy() throws Exception {
195204
196205 // Ensure the client's topology is not updated.
197206 assertTrue (srvcTopOnClient .size () == INIT_SRVC_NODES_CNT
198- && !srvcTopOnClient .contains (grid (GRIDS ).localNode ().id ()));
207+ && !srvcTopOnClient .contains (grid (BASE_NODES_CNT ).localNode ().id ()));
199208
200209 testDisco .release ();
201210
@@ -216,7 +225,7 @@ public void testDelayedServiceRedeploy() throws Exception {
216225
217226 waitForCondition (() -> srvcTopOnClient .size () == 3 && srvcTopOnClient .contains (grid (1 ).localNode ().id ())
218227 && srvcTopOnClient .contains (grid (2 ).localNode ().id ())
219- && srvcTopOnClient .contains (grid (GRIDS ).localNode ().id ()), getTestTimeout ());
228+ && srvcTopOnClient .contains (grid (BASE_NODES_CNT ).localNode ().id ()), getTestTimeout ());
220229 }
221230 finally {
222231 svcRunFlag .set (false );
@@ -228,31 +237,31 @@ public void testDelayedServiceRedeploy() throws Exception {
228237 */
229238 @ Test
230239 public void testNodesJoinSingleThreaded () throws Exception {
231- doTestClusterTopChangesWhileServiceCalling (3 , true , false );
240+ doTestClusterTopChangesWhileServiceCalling (false , 1 );
232241 }
233242
234243 /**
235244 * Tests several nodes come while several threads are used to call the service.
236245 */
237246 @ Test
238247 public void testNodesJoinMultiThreaded () throws Exception {
239- doTestClusterTopChangesWhileServiceCalling (3 , true , true );
248+ doTestClusterTopChangesWhileServiceCalling (false , 4 );
240249 }
241250
242251 /**
243252 * Tests several nodes leaves while one thread is used to call the service.
244253 */
245254 @ Test
246255 public void testNodesLeaveSingleThreaded () throws Exception {
247- doTestClusterTopChangesWhileServiceCalling (3 , false , false );
256+ doTestClusterTopChangesWhileServiceCalling (true , 1 );
248257 }
249258
250259 /**
251260 * Tests several nodes leave while several threads are used to call the service.
252261 */
253262 @ Test
254263 public void testNodesLeaveMultiThreaded () throws Exception {
255- doTestClusterTopChangesWhileServiceCalling (3 , false , true );
264+ doTestClusterTopChangesWhileServiceCalling (true , 4 );
256265 }
257266
258267 /**
@@ -265,7 +274,7 @@ public void testMinorTopologyVersionDoesntAffect() throws Exception {
265274
266275 Set <UUID > srvcTopOnClient = new GridConcurrentHashSet <>();
267276
268- addSrvcTopUpdateClientLogLsnr (srvcTopOnClient ::addAll );
277+ registerServiceTopologyUpdateListener (srvcTopOnClient ::addAll );
269278
270279 ((GridTestLog4jLogger )log ).setLevel (Level .DEBUG );
271280
@@ -306,7 +315,7 @@ public void testForcedServiceRedeployWhileClientIsIdle() throws Exception {
306315
307316 Set <UUID > srvcTopOnClient = new GridConcurrentHashSet <>();
308317
309- addSrvcTopUpdateClientLogLsnr (srvcTopOnClient ::addAll );
318+ registerServiceTopologyUpdateListener (srvcTopOnClient ::addAll );
310319
311320 ((GridTestLog4jLogger )log ).setLevel (Level .DEBUG );
312321
@@ -341,7 +350,7 @@ public void testForcedServiceRedeployWhileClientIsIdle() throws Exception {
341350 waitForCondition (() -> {
342351 svc .testMethod ();
343352
344- return srvcTopOnClient .size () == GRIDS ;
353+ return srvcTopOnClient .size () == BASE_NODES_CNT ;
345354 }, getTestTimeout ());
346355
347356 for (Ignite ig : G .allGrids ())
@@ -350,94 +359,84 @@ public void testForcedServiceRedeployWhileClientIsIdle() throws Exception {
350359 }
351360
352361 /** */
353- private void doTestClusterTopChangesWhileServiceCalling (
354- int nodesCnt ,
355- boolean addNodes ,
356- boolean multiThreaded )
357- throws Exception {
358- assert nodesCnt > 0 ;
359-
360- Set <UUID > newNodesUUIDs = new GridConcurrentHashSet <>();
361-
362+ private void doTestClusterTopChangesWhileServiceCalling (boolean shrinkTop , int svcInvokeThreads ) throws Exception {
362363 // Start additional nodes to stop them.
363- if (!addNodes ) {
364- startGridsMultiThreaded (GRIDS , nodesCnt );
365-
366- for (int i = GRIDS ; i < GRIDS + nodesCnt ; ++i )
367- newNodesUUIDs .add (grid (i ).localNode ().id ());
364+ if (shrinkTop ) {
365+ for (int nodeIdx = BASE_NODES_CNT ; nodeIdx < BASE_NODES_CNT + TOP_UPD_NODES_CNT ; nodeIdx ++)
366+ startGrid (nodeIdx );
368367 }
369368
370- // Service topology on the clients.
371- Set <UUID > srvcTopOnClient = new GridConcurrentHashSet <>();
369+ Set <UUID > expInitSvcTop = resolveServiceTopology ();
370+
371+ assertEquals (shrinkTop ? 5 : 2 , expInitSvcTop .size ());
372372
373- addSrvcTopUpdateClientLogLsnr (srvcTopOnClient ::addAll );
373+ // Last detected service topology on the client side.
374+ AtomicReference <Set <UUID >> svcTop = new AtomicReference <>();
375+
376+ registerServiceTopologyUpdateListener (svcTop ::set );
374377
375- AtomicBoolean changeClusterTop = new AtomicBoolean ();
376378 AtomicBoolean stopFlag = new AtomicBoolean ();
377379
378380 try (IgniteClient client = startClient ()) {
379381 ServicesTest .TestServiceInterface svc = client .services ().serviceProxy (SRV_NAME , ServicesTest .TestServiceInterface .class );
380382
381383 ((GridTestLog4jLogger )log ).setLevel (Level .DEBUG );
382384
383- IgniteInternalFuture <?> runFut = runMultiThreadedAsync (() -> {
384- do {
385- try {
386- svc .testMethod ();
387- }
388- catch (ClientException e ) {
389- String m = e .getMessage ();
390-
391- // TODO: IGNITE-20802 : Exception should not occur.
392- // Client doesn't retry service invocation if the redirected-to service instance node leaves cluster.
393- if (addNodes || (!m .contains ("Node has left grid" ) && !m .contains ("Failed to send job due to node failure" ))
394- || newNodesUUIDs .stream ().noneMatch (nid -> m .contains (nid .toString ())))
395- throw e ;
396- }
397- }
398- while (!stopFlag .get ());
399- }, multiThreaded ? 4 : 1 , "ServiceTestLoader" );
400-
401- while (!stopFlag .get ()) {
402- // Wait until the initial topology is received.
403- if (srvcTopOnClient .size () == (addNodes ? INIT_SRVC_NODES_CNT : INIT_SRVC_NODES_CNT + nodesCnt )
404- && changeClusterTop .compareAndSet (false , true )) {
405- srvcTopOnClient .clear ();
406-
407- for (int i = 0 ; i < nodesCnt ; ++i ) {
408- int nodeIdx = GRIDS + i ;
409-
410- runAsync (() -> {
411- try {
412- if (addNodes )
413- newNodesUUIDs .add (startGrid (nodeIdx ).localNode ().id ());
414- else
415- stopGrid (nodeIdx );
416- }
417- catch (Exception e ) {
418- log .error ("Unable to start or stop test grid." , e );
419-
420- stopFlag .set (true );
421- }
422- });
385+ IgniteInternalFuture <?> svcInvokeFut = runMultiThreadedAsync (
386+ () -> {
387+ do {
388+ try {
389+ svc .testMethod ();
390+ }
391+ catch (Exception e ) {
392+ String errMsg = e .getMessage ();
393+
394+ // TODO: IGNITE-20802 : Exception should not occur.
395+ // Client doesn't retry service invocation if the redirected-to service instance node leaves cluster.
396+ boolean isErrCausedByNodeLeave = errMsg .contains ("Failed to execute task due to grid shutdown" )
397+ || (errMsg .contains ("Node has left grid" ) || errMsg .contains ("Failed to send job due to node failure" ))
398+ && expInitSvcTop .stream ().anyMatch (id -> errMsg .contains (id .toString ()));
399+
400+ assertTrue (shrinkTop && isErrCausedByNodeLeave );
401+ }
423402 }
424- }
403+ while (!stopFlag .get ());
404+ },
405+ svcInvokeThreads ,
406+ "ServiceTestLoader"
407+ );
425408
426- // Stop if new excepted service topology received.
427- if (srvcTopOnClient .size () == (addNodes ? INIT_SRVC_NODES_CNT + nodesCnt : INIT_SRVC_NODES_CNT ))
428- stopFlag .set (true );
409+ assertTrue (waitForCondition (() -> expInitSvcTop .equals (svcTop .get ()), getTestTimeout ()));
429410
430- Thread .sleep (10 );
411+ Collection <IgniteInternalFuture <?>> topUpdFuts = ConcurrentHashMap .newKeySet ();
412+
413+ for (int i = 0 ; i < TOP_UPD_NODES_CNT ; ++i ) {
414+ int nodeIdx = BASE_NODES_CNT + i ;
415+
416+ topUpdFuts .add (runAsync (() -> {
417+ if (shrinkTop )
418+ stopGrid (nodeIdx );
419+ else
420+ startGrid (nodeIdx );
421+ }));
431422 }
432423
433- runFut .get ();
434- }
424+ for (IgniteInternalFuture <?> topUpdFut : topUpdFuts )
425+ topUpdFut .get (getTestTimeout ());
426+
427+ Set <UUID > expUpdSvcTop = resolveServiceTopology ();
428+
429+ assertEquals (shrinkTop ? 2 : 5 , expUpdSvcTop .size ());
435430
436- // The initial nodes must always persist it the service topology.
437- assertTrue (srvcTopOnClient .contains (grid (1 ).localNode ().id ())
438- && srvcTopOnClient .contains (grid (2 ).localNode ().id ()));
431+ assertTrue (waitForCondition (() -> expUpdSvcTop .equals (svcTop .get ()), getTestTimeout ()));
439432
440- assertEquals (addNodes ? nodesCnt : 0 , newNodesUUIDs .stream ().filter (srvcTopOnClient ::contains ).count ());
433+ stopFlag .set (true );
434+
435+ svcInvokeFut .get (getTestTimeout ());
436+ }
437+ finally {
438+ stopFlag .set (true );
439+ }
441440 }
442441
443442 /**
@@ -502,7 +501,7 @@ private void doTestServiceAwarenessForClusterGroup(@Nullable Collection<UUID> gr
502501 ? top
503502 : grp .stream ().filter (nid -> new TestNodeFilter ().apply (grid (0 ).cluster ().node (nid ))).collect (Collectors .toSet ());
504503
505- addSrvcTopUpdateClientLogLsnr (uuids -> {
504+ registerServiceTopologyUpdateListener (uuids -> {
506505 // Reset counters on the first topology update.
507506 if (top .isEmpty ())
508507 redirectCnt .set (0 );
@@ -590,7 +589,7 @@ private void callService(
590589 }
591590
592591 /** Extracts ids of received service instance nodes from the client log. */
593- private static void addSrvcTopUpdateClientLogLsnr (Consumer <Set <UUID >> srvTopConsumer ) {
592+ private static void registerServiceTopologyUpdateListener (Consumer <Set <UUID >> srvTopConsumer ) {
594593 clientLogLsnr .registerListener (s -> {
595594 if (s .contains ("Topology of service '" + SRV_NAME + "' has been updated. The service instance nodes: " )) {
596595 String nodes = s .substring (s .lastIndexOf (": [" ) + 3 , s .length () - 2 );
@@ -612,6 +611,15 @@ private IgniteClient startClient(@Nullable Collection<UUID> requestedServerNodes
612611 getClientConfiguration (grid (0 )));
613612 }
614613
614+ /** */
615+ private static Set <UUID > resolveServiceTopology () {
616+ return G .allGrids ().stream ()
617+ .map (g -> g .cluster ().localNode ())
618+ .filter (TestNodeFilter ::test )
619+ .map (ClusterNode ::id )
620+ .collect (Collectors .toSet ());
621+ }
622+
615623 /**
616624 * Accepts nodes with the name index equal to 1, 2 or >= GRIDS.
617625 */
@@ -621,21 +629,14 @@ private static final class TestNodeFilter implements IgnitePredicate<ClusterNode
621629
622630 /** {@inheritDoc} */
623631 @ Override public boolean apply (ClusterNode node ) {
624- String nodeName = node .attribute ("org.apache.ignite.ignite.name" );
625-
626- if (F .isEmpty (nodeName ))
627- return false ;
628-
629- int nodeIdx = -1 ;
632+ return test (node );
633+ }
630634
631- try {
632- nodeIdx = Integer .parseInt (nodeName .substring (nodeName .length () - 1 ));
633- }
634- catch (Exception e ) {
635- // No-op.
636- }
635+ /** */
636+ static boolean test (ClusterNode node ) {
637+ int nodeIdx = node .attribute (ATTR_NODE_IDX );
637638
638- return nodeIdx == 1 || nodeIdx == 2 || nodeIdx >= GRIDS ;
639+ return nodeIdx == 1 || nodeIdx == 2 || nodeIdx >= BASE_NODES_CNT ;
639640 }
640641 }
641642
0 commit comments