2525import java .util .concurrent .CompletableFuture ;
2626import java .util .concurrent .ConcurrentHashMap ;
2727import java .util .concurrent .CountDownLatch ;
28+ import java .util .concurrent .ThreadLocalRandom ;
2829import java .util .concurrent .TimeUnit ;
2930import java .util .concurrent .atomic .AtomicInteger ;
3031import org .apache .ignite .IgniteCheckedException ;
3839import org .apache .ignite .internal .processors .cache .query .reducer .TextQueryReducer ;
3940import org .apache .ignite .internal .processors .cache .query .reducer .UnsortedCacheQueryReducer ;
4041import org .apache .ignite .internal .util .lang .GridPlainCallable ;
42+ import org .apache .ignite .internal .util .typedef .F ;
4143import org .apache .ignite .internal .util .typedef .internal .U ;
4244
4345import static org .apache .ignite .internal .processors .cache .query .GridCacheQueryType .INDEX ;
@@ -60,7 +62,7 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
6062 /** Count of streams that finish receiving remote pages. */
6163 private final AtomicInteger noRemotePagesStreamsCnt = new AtomicInteger ();
6264
63- /** Count down this latch when every node responses on initial cache query request. */
65+ /** Count down this latch when every localNodeOrRemote responses on initial cache query request. */
6466 private final CountDownLatch firstPageLatch = new CountDownLatch (1 );
6567
6668 /** Set of nodes that deliver their first page. */
@@ -92,7 +94,7 @@ protected GridCacheDistributedQueryFuture(
9294 qryMgr = (GridCacheDistributedQueryManager <K , V >)ctx .queries ();
9395
9496 if (qry .query ().partition () != null )
95- nodes = Collections .singletonList (node (nodes ));
97+ nodes = Collections .singletonList (cctx . isReplicated () ? localOrRemoteNode ( nodes ) : F . first (nodes ));
9698
9799 streams = new ConcurrentHashMap <>(nodes .size ());
98100
@@ -118,19 +120,24 @@ protected GridCacheDistributedQueryFuture(
118120 }
119121
120122 /**
121- * @return Nodes for query execution .
123+ * @return A local node if available, otherwise a random node from the given collection .
122124 */
123- private ClusterNode node (Collection <ClusterNode > nodes ) {
124- ClusterNode rmtNode = null ;
125+ private static ClusterNode localOrRemoteNode (Collection <ClusterNode > nodes ) {
126+ int remoteNodeIdx = ThreadLocalRandom .current ().nextInt (nodes .size ());
127+
128+ ClusterNode remoteNode = null ;
125129
126130 for (ClusterNode node : nodes ) {
127131 if (node .isLocal ())
128132 return node ;
129133
130- rmtNode = node ;
134+ if (remoteNodeIdx -- == 0 )
135+ remoteNode = node ;
131136 }
132137
133- return rmtNode ;
138+ assert remoteNode != null ;
139+
140+ return remoteNode ;
134141 }
135142
136143 /** {@inheritDoc} */
@@ -150,7 +157,7 @@ private ClusterNode node(Collection<ClusterNode> nodes) {
150157 NodePageStream <R > stream = streams .get (nodeId );
151158
152159 if (stream != null && stream .hasRemotePages ())
153- onError (new ClusterTopologyCheckedException ("Remote node has left topology: " + nodeId ));
160+ onError (new ClusterTopologyCheckedException ("Remote localNodeOrRemote has left topology: " + nodeId ));
154161 }
155162
156163 /** {@inheritDoc} */
@@ -282,7 +289,7 @@ private void cancelPages(UUID nodeId) {
282289 GridCacheQueryRequest req = GridCacheQueryRequest .cancelRequest (cctx , reqId , fields ());
283290
284291 if (nodeId .equals (cctx .localNodeId ())) {
285- // Process cancel query directly (without sending) for local node ,
292+ // Process cancel query directly (without sending) for local localNodeOrRemote ,
286293 cctx .closures ().callLocalSafe (new GridPlainCallable <Object >() {
287294 @ Override public Object call () {
288295 qryMgr .processQueryRequest (cctx .localNodeId (), req );
@@ -298,10 +305,10 @@ private void cancelPages(UUID nodeId) {
298305 catch (IgniteCheckedException e ) {
299306 if (cctx .io ().checkNodeLeft (nodeId , e , false )) {
300307 if (log .isDebugEnabled ())
301- log .debug ("Failed to send cancel request, node failed: " + nodeId );
308+ log .debug ("Failed to send cancel request, localNodeOrRemote failed: " + nodeId );
302309 }
303310 else
304- U .error (log , "Failed to send cancel request [node =" + nodeId + ']' , e );
311+ U .error (log , "Failed to send cancel request [localNodeOrRemote =" + nodeId + ']' , e );
305312 }
306313 }
307314 }
0 commit comments