Skip to content

Commit e5305d7

Browse files
authored
IGNITE-27911 Fixed flaky ContinuousQueryBuffersCleanupTest (#12769)
1 parent a6ed7ff commit e5305d7

File tree

2 files changed

+16
-13
lines changed

2 files changed

+16
-13
lines changed

modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/TestCacheConrinuousQueryUtils.java renamed to modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/TestCacheContinuousQueryUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.apache.ignite.internal.util.GridAtomicLong;
2222

2323
/** */
24-
public class TestCacheConrinuousQueryUtils {
24+
public class TestCacheContinuousQueryUtils {
2525
/** */
2626
public static Map<Integer, CacheContinuousQueryEventBuffer> partitionContinuesQueryEntryBuffers(
2727
CacheContinuousQueryHandler<?, ?> hnd

modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/ContinuousQueryBuffersCleanupTest.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.apache.ignite.Ignite;
3030
import org.apache.ignite.IgniteCache;
3131
import org.apache.ignite.Ignition;
32-
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
3332
import org.apache.ignite.cache.query.ContinuousQuery;
3433
import org.apache.ignite.cache.query.Query;
3534
import org.apache.ignite.cache.query.QueryCursor;
@@ -39,6 +38,7 @@
3938
import org.apache.ignite.configuration.IgniteConfiguration;
4039
import org.apache.ignite.internal.IgniteEx;
4140
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
41+
import org.apache.ignite.internal.processors.cache.distributed.GridCacheModuloAffinityFunction;
4242
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
4343
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntry;
4444
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEventBuffer;
@@ -48,14 +48,16 @@
4848
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
4949
import org.junit.Test;
5050

51+
import static java.util.Collections.singletonMap;
5152
import static java.util.concurrent.TimeUnit.MILLISECONDS;
5253
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
5354
import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
55+
import static org.apache.ignite.internal.processors.cache.distributed.GridCacheModuloAffinityFunction.IDX_ATTR;
5456
import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.DFLT_CONTINUOUS_QUERY_BACKUP_ACK_THRESHOLD;
55-
import static org.apache.ignite.internal.processors.cache.query.continuous.TestCacheConrinuousQueryUtils.backupQueueSize;
56-
import static org.apache.ignite.internal.processors.cache.query.continuous.TestCacheConrinuousQueryUtils.bufferedEntries;
57-
import static org.apache.ignite.internal.processors.cache.query.continuous.TestCacheConrinuousQueryUtils.maxReceivedBackupAcknowledgeUpdateCounter;
58-
import static org.apache.ignite.internal.processors.cache.query.continuous.TestCacheConrinuousQueryUtils.partitionContinuesQueryEntryBuffers;
57+
import static org.apache.ignite.internal.processors.cache.query.continuous.TestCacheContinuousQueryUtils.backupQueueSize;
58+
import static org.apache.ignite.internal.processors.cache.query.continuous.TestCacheContinuousQueryUtils.bufferedEntries;
59+
import static org.apache.ignite.internal.processors.cache.query.continuous.TestCacheContinuousQueryUtils.maxReceivedBackupAcknowledgeUpdateCounter;
60+
import static org.apache.ignite.internal.processors.cache.query.continuous.TestCacheContinuousQueryUtils.partitionContinuesQueryEntryBuffers;
5961
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
6062

6163
/** */
@@ -66,6 +68,7 @@ public class ContinuousQueryBuffersCleanupTest extends GridCommonAbstractTest {
6668
/** {@inheritDoc} */
6769
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
6870
return super.getConfiguration(igniteInstanceName)
71+
.setUserAttributes(singletonMap(IDX_ATTR, getTestIgniteInstanceIndex(igniteInstanceName)))
6972
.setCommunicationSpi(new TestRecordingCommunicationSpi()
7073
.setAckSendThreshold(1));
7174
}
@@ -77,11 +80,11 @@ public class ContinuousQueryBuffersCleanupTest extends GridCommonAbstractTest {
7780

7881
/** */
7982
@Test
80-
public void testBackupUpdateAndBackupContinuusQueryAcknowledgmentReordered() throws Exception {
83+
public void testBackupUpdateAndBackupContinuousQueryAcknowledgmentReordered() throws Exception {
8184
startGrids(2);
8285

8386
IgniteCache<Object, Object> cache = grid(0).createCache(new CacheConfiguration<>()
84-
.setAffinity(new RendezvousAffinityFunction().setPartitions(2))
87+
.setAffinity(new GridCacheModuloAffinityFunction(2, 1))
8588
.setName(DEFAULT_CACHE_NAME)
8689
.setBackups(1));
8790

@@ -97,7 +100,7 @@ public void testBackupUpdateAndBackupContinuusQueryAcknowledgmentReordered() thr
97100

98101
// Here we are waiting for initialization of CQ entry buffers on backup node, otherwise backup node will ignore
99102
// acknowledgements.
100-
assertTrue(waitForCondition(() -> !continuosQueryEntryBuffers(grid(1)).isEmpty(), getTestTimeout()));
103+
assertTrue(waitForCondition(() -> !continuousQueryEntryBuffers(grid(1)).isEmpty(), getTestTimeout()));
101104

102105
spi(grid(0)).blockMessages(GridDhtAtomicSingleUpdateRequest.class, grid(1).name());
103106

@@ -111,7 +114,7 @@ public void testBackupUpdateAndBackupContinuusQueryAcknowledgmentReordered() thr
111114

112115
assertTrue(cqListenerNotifiedLatch.await(getTestTimeout(), MILLISECONDS));
113116

114-
CacheContinuousQueryEventBuffer buf = continuosQueryEntryBuffers(grid(1)).get(part);
117+
CacheContinuousQueryEventBuffer buf = continuousQueryEntryBuffers(grid(1)).get(part);
115118

116119
assertTrue(waitForCondition(() -> maxReceivedBackupAcknowledgeUpdateCounter(buf).get() == 2, getTestTimeout()));
117120

@@ -129,7 +132,7 @@ public void testContinuousQueryBuffersCleanup() throws Exception {
129132

130133
try (IgniteClient thinCli = Ignition.startClient(new ClientConfiguration().setAddresses("127.0.0.1:10800"))) {
131134
grid(0).createCache(new CacheConfiguration<Integer, Integer>()
132-
.setAffinity(new RendezvousAffinityFunction().setPartitions(2))
135+
.setAffinity(new GridCacheModuloAffinityFunction(2, 1))
133136
.setName(DEFAULT_CACHE_NAME)
134137
.setWriteSynchronizationMode(FULL_SYNC)
135138
.setBackups(1));
@@ -149,7 +152,7 @@ private void checkBuffersCleared(int primaryNodeIdx, Function<Query<?>, QueryCur
149152

150153
int cacheOpRounds = Math.round(DFLT_CONTINUOUS_QUERY_BACKUP_ACK_THRESHOLD * 0.3f);
151154

152-
// We repeatedly perform 5 cache operations that raise CREATE, UPDATE, REMOVED, EXPIRIED events.
155+
// We repeatedly perform 5 cache operations that raise CREATE, UPDATE, REMOVED, EXPIRED events.
153156
// The total number of events is selected in a such way as to check for a backup notification due to a
154157
// buffer overflow, and then by timeout.
155158
int expEvtsCnt = cacheOpRounds * 5;
@@ -215,7 +218,7 @@ private boolean isContinuesQueryBufferEmpty(Ignite ignite) {
215218
}
216219

217220
/** */
218-
private Map<Integer, CacheContinuousQueryEventBuffer> continuosQueryEntryBuffers(IgniteEx ignite) {
221+
private Map<Integer, CacheContinuousQueryEventBuffer> continuousQueryEntryBuffers(IgniteEx ignite) {
219222
GridContinuousProcessor contProc = ignite.context().continuous();
220223

221224
return partitionContinuesQueryEntryBuffers(

0 commit comments

Comments
 (0)