3232import org .apache .ignite .client .ClientTransaction ;
3333import org .apache .ignite .configuration .CacheConfiguration ;
3434import org .apache .ignite .configuration .ClientConfiguration ;
35+ import org .apache .ignite .configuration .ClientConnectorConfiguration ;
36+ import org .apache .ignite .configuration .ThinClientConfiguration ;
3537import org .apache .ignite .internal .IgniteEx ;
3638import org .apache .ignite .internal .IgniteInterruptedCheckedException ;
3739import org .apache .ignite .internal .util .typedef .F ;
@@ -64,6 +66,9 @@ public class ThinClientPartitionAwarenessMultiDcTest extends ThinClientAbstractP
6466 /** */
6567 private IgniteEx startGrid (int idx , String dcId ) throws Exception {
6668 return startGrid (getConfiguration (getTestIgniteInstanceName (idx ))
69+ .setClientConnectorConfiguration (new ClientConnectorConfiguration ()
70+ .setThinClientConfiguration (new ThinClientConfiguration ()
71+ .setMaxActiveComputeTasksPerConnection (1 )))
6772 .setUserAttributes (F .asMap (IgniteSystemProperties .IGNITE_DATA_CENTER_ID , dcId )));
6873 }
6974
@@ -146,9 +151,17 @@ private void checkDcAwarePaRequests(
146151
147152 assertOpOnChannel (channels [primaryNodeIdx ], ClientOperation .CACHE_PUT );
148153
154+ cache .putAsync (key , 0 ).get ();
155+
156+ assertOpOnChannel (channels [primaryNodeIdx ], ClientOperation .CACHE_PUT );
157+
149158 cache .get (key );
150159
151160 assertOpOnChannel (channels [primaryNodeIdx ], ClientOperation .CACHE_GET );
161+
162+ cache .getAsync (key ).get ();
163+
164+ assertOpOnChannel (channels [primaryNodeIdx ], ClientOperation .CACHE_GET );
152165 }
153166 }
154167
@@ -158,9 +171,13 @@ private void checkDcAwarePaRequests(
158171 for (Integer key : keys ) {
159172 int primaryNodeIdx = getTestIgniteInstanceIndex (primaryNode .name ());
160173
174+ // If primary in another DC, write requests always sent to primary node.
161175 cache .put (key , 0 );
162176
163- // If primary in another DC, write requests always sent to primary node.
177+ assertOpOnChannel (channels [primaryNodeIdx ], ClientOperation .CACHE_PUT );
178+
179+ cache .putAsync (key , 0 ).get ();
180+
164181 assertOpOnChannel (channels [primaryNodeIdx ], ClientOperation .CACHE_PUT );
165182
166183 int backupNodeIdx = getTestIgniteInstanceIndex (backupNode (key , cacheName ).name ());
@@ -172,6 +189,10 @@ private void checkDcAwarePaRequests(
172189
173190 assertOpOnChannel (channels [expReqToBackup ? backupNodeIdx : primaryNodeIdx ], ClientOperation .CACHE_GET );
174191
192+ cache .getAsync (key ).get ();
193+
194+ assertOpOnChannel (channels [expReqToBackup ? backupNodeIdx : primaryNodeIdx ], ClientOperation .CACHE_GET );
195+
175196 cache .containsKey (key );
176197
177198 assertOpOnChannel (channels [expReqToBackup ? backupNodeIdx : primaryNodeIdx ], ClientOperation .CACHE_CONTAINS_KEY );
@@ -219,19 +240,15 @@ private void checkDcAwareNonPaRequests(String cacheName, String dcId, int... exp
219240 for (int i = 0 ; i < 10 ; i ++) {
220241 cache .query (new ScanQuery <>()).getAll ();
221242
222- int channelIdx = nextOpChannelIdx ();
223-
224- assertTrue (F .contains (expNodeIdxs , channelIdx ));
225-
226- assertTrue ("Ops queue not empty: " + opsQueue , F .isEmpty (opsQueue ));
243+ assertExpectedChannel (expNodeIdxs );
227244
228245 client .cluster ().state ();
229246
230- channelIdx = nextOpChannelIdx ( );
247+ assertExpectedChannel ( expNodeIdxs );
231248
232- assertTrue ( F . contains ( expNodeIdxs , channelIdx ) );
249+ client . compute (). executeAsync2 ( TestTask . class . getName (), null ). get ( );
233250
234- assertTrue ( "Ops queue not empty: " + opsQueue , F . isEmpty ( opsQueue ) );
251+ assertExpectedChannel ( expNodeIdxs );
235252
236253 try (ClientTransaction tx = client .transactions ().txStart ()) {
237254 cache .put (ThreadLocalRandom .current ().nextInt (10 ), 0 );
@@ -241,7 +258,7 @@ private void checkDcAwareNonPaRequests(String cacheName, String dcId, int... exp
241258 }
242259
243260 while (true ) {
244- channelIdx = nextOpChannelIdx ();
261+ int channelIdx = nextOpChannelIdx ();
245262
246263 if (channelIdx < 0 )
247264 break ;
@@ -250,4 +267,13 @@ private void checkDcAwareNonPaRequests(String cacheName, String dcId, int... exp
250267 }
251268 }
252269 }
270+
271+ /** */
272+ private void assertExpectedChannel (int ... expChannelIdxs ) {
273+ int channelIdx = nextOpChannelIdx ();
274+
275+ assertTrue (F .contains (expChannelIdxs , channelIdx ));
276+
277+ assertTrue ("Ops queue not empty: " + opsQueue , F .isEmpty (opsQueue ));
278+ }
253279}
0 commit comments