Skip to content

Commit ce8ab50

Browse files
committed
Fix
1 parent 739d4af commit ce8ab50

File tree

3 files changed

+24
-26
lines changed

3 files changed

+24
-26
lines changed

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -840,9 +840,12 @@ private Collection<ClusterNode> nodes() throws IgniteCheckedException {
840840
}
841841

842842
/**
843+
* Collects query data nodes matching specified {@code prj} and {@code part}.
844+
*
843845
* @param cctx Cache context.
844846
* @param prj Projection (optional).
845-
* @return Collection of data nodes in provided projection (if any).
847+
* @param part Partition (optional).
848+
* @return Collection of data nodes matching specified {@code prj} and {@code part}.
846849
* @throws IgniteCheckedException If partition number is invalid.
847850
*/
848851
private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx,
@@ -856,25 +859,17 @@ private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx,
856859
if (prj == null && part == null)
857860
return affNodes;
858861

859-
if (part != null && part >= cctx.affinity().partitions())
860-
throw new IgniteCheckedException("Invalid partition number: " + part);
861-
862-
final Set<ClusterNode> partHolders;
863-
864862
if (part != null) {
865-
if (cctx.config().getCacheMode() == CacheMode.PARTITIONED)
866-
partHolders = Collections.singleton(cctx.affinity().primaryByPartition(part, topVer));
867-
else
868-
partHolders = new HashSet<>(cctx.topology().owners(part, topVer));
863+
if (part >= cctx.affinity().partitions())
864+
throw new IgniteCheckedException("Invalid partition number: " + part);
865+
866+
affNodes = cctx.topology().nodes(part, topVer);
869867
}
870-
else
871-
partHolders = Collections.emptySet();
872868

873869
return F.view(affNodes, new P1<ClusterNode>() {
874870
@Override public boolean apply(ClusterNode n) {
875871
return cctx.discovery().cacheAffinityNode(n, cctx.name()) &&
876-
(prj == null || prj.node(n.id()) != null) &&
877-
(part == null || partHolders.contains(n));
872+
(prj == null || prj.node(n.id()) != null);
878873
}
879874
});
880875
}

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.CompletableFuture;
2626
import java.util.concurrent.ConcurrentHashMap;
2727
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.ThreadLocalRandom;
2829
import java.util.concurrent.TimeUnit;
2930
import java.util.concurrent.atomic.AtomicInteger;
3031
import org.apache.ignite.IgniteCheckedException;
@@ -38,6 +39,7 @@
3839
import org.apache.ignite.internal.processors.cache.query.reducer.TextQueryReducer;
3940
import org.apache.ignite.internal.processors.cache.query.reducer.UnsortedCacheQueryReducer;
4041
import org.apache.ignite.internal.util.lang.GridPlainCallable;
42+
import org.apache.ignite.internal.util.typedef.F;
4143
import org.apache.ignite.internal.util.typedef.internal.U;
4244

4345
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.INDEX;
@@ -92,7 +94,7 @@ protected GridCacheDistributedQueryFuture(
9294
qryMgr = (GridCacheDistributedQueryManager<K, V>)ctx.queries();
9395

9496
if (qry.query().partition() != null)
95-
nodes = Collections.singletonList(node(nodes));
97+
nodes = Collections.singletonList(cctx.isReplicated() ? localOrRemoteNode(nodes) : F.first(nodes));
9698

9799
streams = new ConcurrentHashMap<>(nodes.size());
98100

@@ -118,19 +120,24 @@ protected GridCacheDistributedQueryFuture(
118120
}
119121

120122
/**
121-
* @return Nodes for query execution.
123+
* @return A local node if available, otherwise a random node from the given collection.
122124
*/
123-
private ClusterNode node(Collection<ClusterNode> nodes) {
124-
ClusterNode rmtNode = null;
125+
private static ClusterNode localOrRemoteNode(Collection<ClusterNode> nodes) {
126+
int remoteNodeIdx = ThreadLocalRandom.current().nextInt(nodes.size());
127+
128+
ClusterNode remoteNode = null;
125129

126130
for (ClusterNode node : nodes) {
127131
if (node.isLocal())
128132
return node;
129133

130-
rmtNode = node;
134+
if (remoteNodeIdx-- == 0)
135+
remoteNode = node;
131136
}
132137

133-
return rmtNode;
138+
assert remoteNode != null;
139+
140+
return remoteNode;
134141
}
135142

136143
/** {@inheritDoc} */
@@ -282,7 +289,7 @@ private void cancelPages(UUID nodeId) {
282289
GridCacheQueryRequest req = GridCacheQueryRequest.cancelRequest(cctx, reqId, fields());
283290

284291
if (nodeId.equals(cctx.localNodeId())) {
285-
// Process cancel query directly (without sending) for local node,
292+
// Process cancel query directly (without sending) for local node.
286293
cctx.closures().callLocalSafe(new GridPlainCallable<Object>() {
287294
@Override public Object call() {
288295
qryMgr.processQueryRequest(cctx.localNodeId(), req);

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import org.apache.ignite.IgniteException;
4747
import org.apache.ignite.IgniteSystemProperties;
4848
import org.apache.ignite.cache.CacheEntry;
49-
import org.apache.ignite.cache.CacheMode;
5049
import org.apache.ignite.cache.QueryIndexType;
5150
import org.apache.ignite.cache.query.IndexQuery;
5251
import org.apache.ignite.cache.query.QueryMetrics;
@@ -1124,10 +1123,7 @@ public GridCloseableIterator indexQueryLocal(final CacheQuery qry) throws Ignite
11241123
if (part != null) {
11251124
final GridDhtLocalPartition locPart = cctx.dht().topology().localPartition(part);
11261125

1127-
boolean nonLocPart = locPart == null || locPart.state() != OWNING ||
1128-
(cctx.config().getCacheMode() == CacheMode.PARTITIONED && !locPart.primary(AffinityTopologyVersion.NONE));
1129-
1130-
if (nonLocPart) {
1126+
if (locPart == null || locPart.state() != OWNING) {
11311127
throw new CacheInvalidStateException("Failed to execute index query because required partition " +
11321128
"has not been found on local node [cacheName=" + cctx.name() + ", part=" + part + "]");
11331129
}

0 commit comments

Comments
 (0)