2525import javax .cache .CacheException ;
2626import org .apache .ignite .Ignite ;
2727import org .apache .ignite .IgniteCache ;
28+ import org .apache .ignite .IgniteException ;
2829import org .apache .ignite .cache .query .QueryCancelledException ;
2930import org .apache .ignite .cache .query .QueryCursor ;
3031import org .apache .ignite .cache .query .SqlFieldsQuery ;
3132import org .apache .ignite .internal .IgniteEx ;
32- import org .apache .ignite .internal .processors . GridProcessor ;
33+ import org .apache .ignite .internal .IgniteInterruptedCheckedException ;
3334import org .apache .ignite .internal .processors .query .h2 .IgniteH2Indexing ;
35+ import org .apache .ignite .internal .util .typedef .F ;
3436import org .apache .ignite .internal .util .typedef .X ;
3537import org .apache .ignite .internal .util .typedef .internal .U ;
3638import org .junit .Test ;
3739
40+ import static org .apache .ignite .testframework .GridTestUtils .waitForCondition ;
41+
3842/**
3943 * Test for cancel of query containing distributed joins.
4044 */
@@ -90,7 +94,7 @@ public void testTimeout4() throws Exception {
9094 /** */
9195 private void testQueryCancel (Ignite ignite , String cacheName , String sql , int timeoutUnits , TimeUnit timeUnit ,
9296 boolean timeout , boolean checkCanceled ) throws Exception {
93- SqlFieldsQuery qry = new SqlFieldsQuery (sql ).setDistributedJoins (true );;
97+ SqlFieldsQuery qry = new SqlFieldsQuery (sql ).setDistributedJoins (true );
9498
9599 IgniteCache <Object , Object > cache = ignite .cache (cacheName );
96100
@@ -107,44 +111,36 @@ private void testQueryCancel(Ignite ignite, String cacheName, String sql, int ti
107111 }
108112
109113 try (QueryCursor <List <?>> ignored = cursor ) {
110- int resSize = 0 ;
111- for (List <?> ignored1 : cursor ) {
112- ++resSize ;
113- }
114+ int resSize = F .size (cursor .iterator ());
114115
115116 if (checkCanceled )
116117 fail ("Query not canceled, result size=" + resSize );
117118 }
118- catch (CacheException ex ) {
119- log ().error ("Got expected exception" , ex );
119+ catch (CacheException | IgniteException ex ) {
120+ log ().error ("Got exception" , ex );
120121
121122 assertNotNull ("Must throw correct exception" , X .cause (ex , QueryCancelledException .class ));
122123 }
123124
124- // Give some time to clean up.
125- Thread .sleep (TimeUnit .MILLISECONDS .convert (timeoutUnits , timeUnit ) + 1_000 );
126-
127125 checkCleanState ();
128126 }
129127
130128 /**
131129 * Validates clean state on all participating nodes after query cancellation.
132130 */
133- private void checkCleanState () {
131+ private void checkCleanState () throws IgniteInterruptedCheckedException {
134132 for (int i = 0 ; i < GRID_CNT ; i ++) {
135133 IgniteEx grid = grid (i );
136134
137135 // Validate everything was cleaned up.
138- ConcurrentMap <UUID , ?> map = U .field (((IgniteH2Indexing )U .field ((GridProcessor )U .field (
139- grid .context (), "qryProc" ), "idx" )).mapQueryExecutor (), "qryRess" );
140-
141- String msg = "Map executor state is not cleared" ;
136+ ConcurrentMap <UUID , ?> map = U .field (
137+ ((IgniteH2Indexing )grid .context ().query ().getIndexing ()).mapQueryExecutor (), "qryRess" );
142138
143139 // TODO FIXME Current implementation leaves map entry for each node that's ever executed a query.
144140 for (Object result : map .values ()) {
145141 Map <Long , ?> m = U .field (result , "res" );
146142
147- assertEquals ( msg , 0 , m . size ( ));
143+ assertTrue ( "Map executor state is not cleared" , waitForCondition ( m :: isEmpty , 1_000L ));
148144 }
149145 }
150146 }
0 commit comments