Skip to content

Commit 6f7b23e

Browse files
committed
IGNITE-26877 Fixed flaky ServiceAwarenessTest.testNodesLeaveMultiThreaded
1 parent 674e214 commit 6f7b23e

File tree

1 file changed

+34
-27
lines changed

1 file changed

+34
-27
lines changed

modules/core/src/test/java/org/apache/ignite/internal/client/thin/ServiceAwarenessTest.java

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -98,18 +98,12 @@ public class ServiceAwarenessTest extends AbstractThinClientTest {
9898
/** */
9999
private static ListeningTestLogger clientLogLsnr;
100100

101-
/** */
102-
private final UUID[] nodeIds = new UUID[BASE_NODES_CNT + TOP_UPD_NODES_CNT];
103-
104101
/** {@inheritDoc} */
105102
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
106103
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
107104

108-
int nodeIdx = getTestIgniteInstanceIndex(igniteInstanceName);
109-
110105
cfg.setDiscoverySpi(new TestBlockingDiscoverySpi());
111-
cfg.setNodeId(nodeIds[nodeIdx]);
112-
cfg.setUserAttributes(Collections.singletonMap(ATTR_NODE_IDX, nodeIdx));
106+
cfg.setUserAttributes(Collections.singletonMap(ATTR_NODE_IDX, getTestIgniteInstanceIndex(igniteInstanceName)));
113107

114108
return cfg;
115109
}
@@ -162,9 +156,6 @@ private static ServiceConfiguration serviceCfg() {
162156
@Override protected void beforeTest() throws Exception {
163157
super.beforeTest();
164158

165-
for (int i = 0; i < nodeIds.length; i++)
166-
nodeIds[i] = UUID.randomUUID();
167-
168159
startGrids(BASE_NODES_CNT);
169160

170161
grid(1).services().deploy(serviceCfg());
@@ -369,19 +360,22 @@ public void testForcedServiceRedeployWhileClientIsIdle() throws Exception {
369360

370361
/** */
371362
private void doTestClusterTopChangesWhileServiceCalling(boolean shrinkTop, int svcInvokeThreads) throws Exception {
372-
Set<UUID> expInitSvcTop = shrinkTop ? nodeIds(1, 2, 4, 5, 6) : nodeIds(1, 2);
373-
Set<UUID> expUpdSvcTop = shrinkTop ? nodeIds(1, 2) : nodeIds(1, 2, 4, 5, 6);
374-
375363
// Start additional nodes to stop them.
376-
if (shrinkTop)
377-
startGridsMultiThreaded(BASE_NODES_CNT, TOP_UPD_NODES_CNT);
364+
if (shrinkTop) {
365+
for (int nodeIdx = BASE_NODES_CNT; nodeIdx < BASE_NODES_CNT + TOP_UPD_NODES_CNT; nodeIdx++)
366+
startGrid(nodeIdx);
367+
}
368+
369+
Set<UUID> expInitSvcTop = resolveServiceTopology();
370+
371+
assertEquals(shrinkTop ? 5 : 2, expInitSvcTop.size());
378372

379-
// Service topology on the clients.
380-
AtomicReference<Set<UUID>> actualSvcTop = new AtomicReference<>();
373+
// Last detected service topology on the client side.
374+
AtomicReference<Set<UUID>> svcTop = new AtomicReference<>();
381375

382-
registerServiceTopologyUpdateListener(actualSvcTop::set);
376+
registerServiceTopologyUpdateListener(svcTop::set);
383377

384-
AtomicBoolean isTestStopped = new AtomicBoolean();
378+
AtomicBoolean stopFlag = new AtomicBoolean();
385379

386380
try (IgniteClient client = startClient()) {
387381
ServicesTest.TestServiceInterface svc = client.services().serviceProxy(SRV_NAME, ServicesTest.TestServiceInterface.class);
@@ -404,13 +398,13 @@ private void doTestClusterTopChangesWhileServiceCalling(boolean shrinkTop, int s
404398
&& expInitSvcTop.stream().anyMatch(id -> errMsg.contains(id.toString())));
405399
}
406400
}
407-
while (!isTestStopped.get());
401+
while (!stopFlag.get());
408402
},
409403
svcInvokeThreads,
410404
"ServiceTestLoader"
411405
);
412406

413-
assertTrue(waitForCondition(() -> expInitSvcTop.equals(actualSvcTop.get()), getTestTimeout()));
407+
assertTrue(waitForCondition(() -> expInitSvcTop.equals(svcTop.get()), getTestTimeout()));
414408

415409
Collection<IgniteInternalFuture<?>> topUpdFuts = ConcurrentHashMap.newKeySet();
416410

@@ -425,17 +419,21 @@ private void doTestClusterTopChangesWhileServiceCalling(boolean shrinkTop, int s
425419
}));
426420
}
427421

428-
assertTrue(waitForCondition(() -> expUpdSvcTop.equals(actualSvcTop.get()), getTestTimeout()));
429-
430422
for (IgniteInternalFuture<?> topUpdFut : topUpdFuts)
431423
topUpdFut.get(getTestTimeout());
432424

433-
isTestStopped.set(true);
425+
Set<UUID> expUpdSvcTop = resolveServiceTopology();
426+
427+
assertEquals(shrinkTop ? 2 : 5, expUpdSvcTop.size());
428+
429+
assertTrue(waitForCondition(() -> expUpdSvcTop.equals(svcTop.get()), getTestTimeout()));
430+
431+
stopFlag.set(true);
434432

435433
svcInvokeFut.get(getTestTimeout());
436434
}
437435
finally {
438-
isTestStopped.set(true);
436+
stopFlag.set(true);
439437
}
440438
}
441439

@@ -612,8 +610,12 @@ private IgniteClient startClient(@Nullable Collection<UUID> requestedServerNodes
612610
}
613611

614612
/** */
615-
private Set<UUID> nodeIds(int... nodeIdxs) {
616-
return Arrays.stream(nodeIdxs).mapToObj(idx -> nodeIds[idx]).collect(Collectors.toSet());
613+
private static Set<UUID> resolveServiceTopology() {
614+
return G.allGrids().stream()
615+
.map(g -> g.cluster().localNode())
616+
.filter(TestNodeFilter::test)
617+
.map(ClusterNode::id)
618+
.collect(Collectors.toSet());
617619
}
618620

619621
/**
@@ -625,6 +627,11 @@ private static final class TestNodeFilter implements IgnitePredicate<ClusterNode
625627

626628
/** {@inheritDoc} */
627629
@Override public boolean apply(ClusterNode node) {
630+
return test(node);
631+
}
632+
633+
/** */
634+
static boolean test(ClusterNode node) {
628635
int nodeIdx = node.attribute(ATTR_NODE_IDX);
629636

630637
return nodeIdx == 1 || nodeIdx == 2 || nodeIdx >= BASE_NODES_CNT;

0 commit comments

Comments
 (0)