Skip to content

Commit cb021f5

Browse files
authored
IGNITE-27873 Fix flaky IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest related tests (#12749)
1 parent d8904e9 commit cb021f5

File tree

2 files changed

+22
-49
lines changed

2 files changed

+22
-49
lines changed

modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryAbstractDistributedJoinSelfTest.java

Lines changed: 8 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,14 @@
1818
package org.apache.ignite.internal.processors.cache.distributed.near;
1919

2020
import java.io.Serializable;
21-
import java.util.Random;
2221
import org.apache.ignite.IgniteCache;
2322
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
2423
import org.apache.ignite.cache.query.annotations.QuerySqlField;
25-
import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
2624
import org.apache.ignite.configuration.CacheConfiguration;
2725
import org.apache.ignite.configuration.IgniteConfiguration;
28-
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
2926
import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
30-
import org.apache.ignite.internal.util.GridRandom;
3127
import org.apache.ignite.internal.util.typedef.F;
32-
import org.apache.ignite.internal.util.typedef.internal.U;
28+
import org.apache.ignite.testframework.GridTestUtils;
3329

3430
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
3531
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
@@ -65,10 +61,10 @@ public class IgniteCacheQueryAbstractDistributedJoinSelfTest extends AbstractInd
6561
"order by co._key, pr._key ";
6662

6763
/** */
68-
protected static final String QRY_LONG = "select pe.id, co.id, pr._key\n" +
64+
protected static final String QRY_LONG = "select id1 from (select pe.id as id1, co.id, pr._key\n" +
6965
"from \"pe\".Person pe, \"pr\".Product pr, \"co\".Company co, \"pu\".Purchase pu\n" +
70-
"where pe._key = pu.personId and pu.productId = pr._key and pr.companyId = co._key \n" +
71-
"order by pe.id desc";
66+
"where pe._key = pu.personId and pu.productId = pr._key and pr.companyId = co._key\n" +
67+
"order by pe.id) where id1 > sleep(10)";
7268

7369
/** */
7470
protected static final int GRID_CNT = 2;
@@ -104,6 +100,7 @@ public class IgniteCacheQueryAbstractDistributedJoinSelfTest extends AbstractInd
104100
cc.setRebalanceMode(SYNC);
105101
cc.setLongQueryWarningTimeout(15_000);
106102
cc.setAffinity(new RendezvousAffinityFunction(false, 60));
103+
cc.setSqlFunctionClasses(GridTestUtils.SqlTestFunctions.class);
107104

108105
switch (name) {
109106
case "pe":
@@ -163,10 +160,8 @@ private void fillCaches() {
163160

164161
IgniteCache<Integer, Product> pr = grid(0).cache("pr");
165162

166-
Random rnd = new GridRandom();
167-
168163
for (int i = 0; i < PRODUCT_CNT; i++)
169-
pr.put(i, new Product(i, rnd.nextInt(COMPANY_CNT)));
164+
pr.put(i, new Product(i, i % COMPANY_CNT));
170165

171166
IgniteCache<Integer, Person> pe = grid(0).cache("pe");
172167

@@ -176,8 +171,8 @@ private void fillCaches() {
176171
IgniteCache<Integer, Purchase> pu = grid(0).cache("pu");
177172

178173
for (int i = 0; i < PURCHASE_CNT; i++) {
179-
int persId = rnd.nextInt(PERS_CNT);
180-
int prodId = rnd.nextInt(PRODUCT_CNT);
174+
int persId = i % PERS_CNT;
175+
int prodId = i % PRODUCT_CNT;
181176

182177
pu.put(i, new Purchase(persId, prodId));
183178
}
@@ -258,20 +253,4 @@ protected static class Product implements Serializable {
258253
this.companyId = companyId;
259254
}
260255
}
261-
262-
/** */
263-
public static class Functions {
264-
/** */
265-
@QuerySqlFunction
266-
public static int sleep() {
267-
try {
268-
U.sleep(1_000);
269-
}
270-
catch (IgniteInterruptedCheckedException ignored) {
271-
// No-op.
272-
}
273-
274-
return 0;
275-
}
276-
}
277256
}

modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryStopOnCancelOrTimeoutDistributedJoinSelfTest.java

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,20 @@
2525
import javax.cache.CacheException;
2626
import org.apache.ignite.Ignite;
2727
import org.apache.ignite.IgniteCache;
28-
import org.apache.ignite.IgniteCheckedException;
28+
import org.apache.ignite.IgniteException;
2929
import org.apache.ignite.cache.query.QueryCancelledException;
3030
import org.apache.ignite.cache.query.QueryCursor;
3131
import org.apache.ignite.cache.query.SqlFieldsQuery;
3232
import org.apache.ignite.internal.IgniteEx;
33-
import org.apache.ignite.internal.processors.GridProcessor;
33+
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
3434
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
35+
import org.apache.ignite.internal.util.typedef.F;
3536
import org.apache.ignite.internal.util.typedef.X;
3637
import org.apache.ignite.internal.util.typedef.internal.U;
3738
import org.junit.Test;
3839

40+
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
41+
3942
/**
4043
* Test for cancel of query containing distributed joins.
4144
*/
@@ -104,49 +107,40 @@ private void testQueryCancel(Ignite ignite, String cacheName, String sql, int ti
104107
else {
105108
cursor = cache.query(qry);
106109

107-
ignite.scheduler().runLocal(new Runnable() {
108-
@Override public void run() {
109-
cursor.close();
110-
}
111-
}, timeoutUnits, timeUnit);
110+
ignite.scheduler().runLocal(cursor::close, timeoutUnits, timeUnit);
112111
}
113112

114113
try (QueryCursor<List<?>> ignored = cursor) {
115-
cursor.getAll();
114+
int resSize = F.size(cursor.iterator());
116115

117116
if (checkCanceled)
118-
fail("Query not canceled");
117+
fail("Query not canceled, result size=" + resSize);
119118
}
120-
catch (CacheException ex) {
121-
log().error("Got expected exception", ex);
119+
catch (CacheException | IgniteException ex) {
120+
log().error("Got exception", ex);
122121

123122
assertNotNull("Must throw correct exception", X.cause(ex, QueryCancelledException.class));
124123
}
125124

126-
// Give some time to clean up.
127-
Thread.sleep(TimeUnit.MILLISECONDS.convert(timeoutUnits, timeUnit) + 3_000);
128-
129125
checkCleanState();
130126
}
131127

132128
/**
133129
* Validates clean state on all participating nodes after query cancellation.
134130
*/
135-
private void checkCleanState() throws IgniteCheckedException {
131+
private void checkCleanState() throws IgniteInterruptedCheckedException {
136132
for (int i = 0; i < GRID_CNT; i++) {
137133
IgniteEx grid = grid(i);
138134

139135
// Validate everything was cleaned up.
140-
ConcurrentMap<UUID, ?> map = U.field(((IgniteH2Indexing)U.field((GridProcessor)U.field(
141-
grid.context(), "qryProc"), "idx")).mapQueryExecutor(), "qryRess");
142-
143-
String msg = "Map executor state is not cleared";
136+
ConcurrentMap<UUID, ?> map = U.field(
137+
((IgniteH2Indexing)grid.context().query().getIndexing()).mapQueryExecutor(), "qryRess");
144138

145139
// TODO FIXME Current implementation leaves map entry for each node that's ever executed a query.
146140
for (Object result : map.values()) {
147141
Map<Long, ?> m = U.field(result, "res");
148142

149-
assertEquals(msg, 0, m.size());
143+
assertTrue("Map executor state is not cleared", waitForCondition(m::isEmpty, 1_000L));
150144
}
151145
}
152146
}

0 commit comments

Comments
 (0)