Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
Expand Down Expand Up @@ -840,9 +839,12 @@ private Collection<ClusterNode> nodes() throws IgniteCheckedException {
}

/**
* Collects query data nodes matching specified {@code prj} and {@code part}.
*
* @param cctx Cache context.
* @param prj Projection (optional).
* @return Collection of data nodes in provided projection (if any).
* @param part Partition (optional).
* @return Collection of data nodes matching specified {@code prj} and {@code part}.
* @throws IgniteCheckedException If partition number is invalid.
*/
private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx,
Expand All @@ -856,19 +858,14 @@ private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx,
if (prj == null && part == null)
return affNodes;

if (part != null && part >= cctx.affinity().partitions())
throw new IgniteCheckedException("Invalid partition number: " + part);
if (part != null) {
if (part >= cctx.affinity().partitions())
throw new IgniteCheckedException("Invalid partition number: " + part);

final Set<ClusterNode> owners =
part == null ? Collections.<ClusterNode>emptySet() : new HashSet<>(cctx.topology().owners(part, topVer));
affNodes = cctx.topology().nodes(part, topVer);
}

return F.view(affNodes, new P1<ClusterNode>() {
@Override public boolean apply(ClusterNode n) {
return cctx.discovery().cacheAffinityNode(n, cctx.name()) &&
(prj == null || prj.node(n.id()) != null) &&
(part == null || owners.contains(n));
}
});
return prj == null ? affNodes : F.view(affNodes, n -> prj.node(n.id()) != null);
}

/** */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCheckedException;
Expand All @@ -38,6 +39,7 @@
import org.apache.ignite.internal.processors.cache.query.reducer.TextQueryReducer;
import org.apache.ignite.internal.processors.cache.query.reducer.UnsortedCacheQueryReducer;
import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;

import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.INDEX;
Expand Down Expand Up @@ -92,7 +94,7 @@ protected GridCacheDistributedQueryFuture(
qryMgr = (GridCacheDistributedQueryManager<K, V>)ctx.queries();

if (qry.query().partition() != null)
nodes = Collections.singletonList(node(nodes));
nodes = Collections.singletonList(cctx.isReplicated() ? localOrRemoteNode(nodes) : F.first(nodes));

streams = new ConcurrentHashMap<>(nodes.size());

Expand All @@ -118,18 +120,23 @@ protected GridCacheDistributedQueryFuture(
}

/**
* @return Nodes for query execution.
* @return A local node if available, otherwise a random node from the given collection.
*/
private ClusterNode node(Collection<ClusterNode> nodes) {
private ClusterNode localOrRemoteNode(Collection<ClusterNode> nodes) {
int remoteNodeIdx = ThreadLocalRandom.current().nextInt(nodes.size());

ClusterNode rmtNode = null;

for (ClusterNode node : nodes) {
if (node.isLocal())
return node;

rmtNode = node;
if (remoteNodeIdx-- == 0)
rmtNode = node;
}

assert rmtNode != null;

return rmtNode;
}

Expand Down Expand Up @@ -282,7 +289,7 @@ private void cancelPages(UUID nodeId) {
GridCacheQueryRequest req = GridCacheQueryRequest.cancelRequest(cctx, reqId, fields());

if (nodeId.equals(cctx.localNodeId())) {
// Process cancel query directly (without sending) for local node,
// Process cancel query directly (without sending) for local node.
cctx.closures().callLocalSafe(new GridPlainCallable<Object>() {
@Override public Object call() {
qryMgr.processQueryRequest(cctx.localNodeId(), req);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1121,11 +1121,11 @@ public GridCloseableIterator indexQueryLocal(final CacheQuery qry) throws Ignite
int[] parts = null;

if (part != null) {
final GridDhtLocalPartition locPart = cctx.dht().topology().localPartition(part);
AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();

if (locPart == null || locPart.state() != OWNING) {
if (cctx.isPartitioned() && !cctx.affinity().primaryByPartition(cctx.localNode(), part, topVer)) {
throw new CacheInvalidStateException("Failed to execute index query because required partition " +
"has not been found on local node [cacheName=" + cctx.name() + ", part=" + part + "]");
"is not primary on local node [cacheName=" + cctx.name() + ", part=" + part + ", topVer=" + topVer + ']');
}

parts = new int[] {part};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public static List<Object[]> params() {
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setCacheMode(cacheMode)
.setIndexedTypes(Integer.class, Person.class)
.setBackups(1)
.setAffinity(new RendezvousAffinityFunction().setPartitions(100));

cfg.setCacheConfiguration(ccfg);
Expand Down Expand Up @@ -136,7 +137,7 @@ public void testSinglePartition() {
}
}

assertEquals(sendReq, TestRecordingCommunicationSpi.spi(grid()).recordedMessages(true).size());
assertEquals("part=" + part, sendReq, TestRecordingCommunicationSpi.spi(grid()).recordedMessages(true).size());
}
}

Expand Down Expand Up @@ -181,7 +182,7 @@ public void testLocalWithPartition() {
GridTestUtils.assertThrows(null, () -> grid().cache("CACHE").query(qry).getAll(),
client ? IgniteException.class : CacheInvalidStateException.class,
client ? "Failed to execute local index query on a client node." :
"Failed to execute index query because required partition has not been found on local node");
"Failed to execute index query because required partition is not primary on local node");
}
else
assertTrue(!grid().cache("CACHE").query(qry).getAll().isEmpty());
Expand Down
Loading