Skip to content

Commit af5d1dd

Browse files
IGNITE-23975 SQL Calcite: Add group partitions reservation - Fixes #11758.
Signed-off-by: Aleksey Plekhanov <plehanov.alex@gmail.com>
1 parent d593367 commit af5d1dd

File tree

16 files changed

+582
-454
lines changed

16 files changed

+582
-454
lines changed

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java

Lines changed: 48 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,18 @@
1818
package org.apache.ignite.internal.processors.query.calcite.exec;
1919

2020
import java.util.ArrayList;
21-
import java.util.Collections;
21+
import java.util.Collection;
2222
import java.util.Iterator;
2323
import java.util.List;
24+
import java.util.stream.IntStream;
25+
import org.apache.ignite.IgniteCheckedException;
2426
import org.apache.ignite.cluster.ClusterTopologyException;
2527
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
2628
import org.apache.ignite.internal.processors.cache.GridCacheContext;
2729
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
2830
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
29-
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
3031
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
31-
import org.apache.ignite.internal.util.typedef.F;
32+
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservation;
3233

3334
/** */
3435
public abstract class AbstractCacheScan<Row> implements Iterable<Row>, AutoCloseable {
@@ -45,15 +46,35 @@ public abstract class AbstractCacheScan<Row> implements Iterable<Row>, AutoClose
4546
protected final int[] parts;
4647

4748
/** */
48-
protected volatile List<GridDhtLocalPartition> reserved;
49+
protected final boolean explicitParts;
50+
51+
/** */
52+
private PartitionReservation reservation;
53+
54+
/** */
55+
protected volatile List<GridDhtLocalPartition> reservedParts;
4956

5057
/** */
5158
AbstractCacheScan(ExecutionContext<Row> ectx, GridCacheContext<?, ?> cctx, int[] parts) {
5259
this.ectx = ectx;
5360
this.cctx = cctx;
54-
this.parts = parts;
5561

5662
topVer = ectx.topologyVersion();
63+
64+
explicitParts = parts != null;
65+
66+
if (cctx.isReplicated())
67+
this.parts = IntStream.range(0, cctx.affinity().partitions()).toArray();
68+
else {
69+
if (parts != null)
70+
this.parts = parts;
71+
else {
72+
Collection<Integer> primaryParts = cctx.affinity().primaryPartitions(
73+
cctx.kernalContext().localNodeId(), topVer);
74+
75+
this.parts = primaryParts.stream().mapToInt(Integer::intValue).toArray();
76+
}
77+
}
5778
}
5879

5980
/** {@inheritDoc} */
@@ -80,7 +101,7 @@ public abstract class AbstractCacheScan<Row> implements Iterable<Row>, AutoClose
80101

81102
/** */
82103
private synchronized void reserve() {
83-
if (reserved != null)
104+
if (reservation != null)
84105
return;
85106

86107
GridDhtPartitionTopology top = cctx.topology();
@@ -98,61 +119,42 @@ private synchronized void reserve() {
98119
throw new ClusterTopologyException("Topology was changed. Please retry on stable topology.");
99120
}
100121

101-
List<GridDhtLocalPartition> toReserve;
102-
103-
if (cctx.isReplicated()) {
104-
int partsCnt = cctx.affinity().partitions();
105-
106-
toReserve = new ArrayList<>(partsCnt);
107-
108-
for (int i = 0; i < partsCnt; i++)
109-
toReserve.add(top.localPartition(i));
110-
}
111-
else if (cctx.isPartitioned()) {
112-
assert parts != null;
122+
try {
123+
PartitionReservation reservation;
113124

114-
toReserve = new ArrayList<>(parts.length);
125+
try {
126+
reservation = cctx.kernalContext().query().partitionReservationManager().reservePartitions(
127+
cctx, topVer, explicitParts ? parts : null, ectx.originatingNodeId(), "qryId=" + ectx.queryId());
128+
}
129+
catch (IgniteCheckedException e) {
130+
throw new ClusterTopologyException("Failed to reserve partition for query execution", e);
131+
}
115132

116-
for (int i = 0; i < parts.length; i++)
117-
toReserve.add(top.localPartition(parts[i]));
118-
}
119-
else
120-
toReserve = Collections.emptyList();
133+
if (reservation.failed()) {
134+
reservation.release();
121135

122-
List<GridDhtLocalPartition> reserved = new ArrayList<>(toReserve.size());
136+
throw new ClusterTopologyException(reservation.error());
137+
}
123138

124-
try {
125-
for (GridDhtLocalPartition part : toReserve) {
126-
if (part == null || !part.reserve())
127-
throw new ClusterTopologyException("Failed to reserve partition for query execution. Retry on stable topology.");
128-
else if (part.state() != GridDhtPartitionState.OWNING) {
129-
part.release();
139+
this.reservation = reservation;
130140

131-
throw new ClusterTopologyException("Failed to reserve partition for query execution. Retry on stable topology.");
132-
}
141+
List<GridDhtLocalPartition> reservedParts = new ArrayList<>(parts.length);
133142

134-
reserved.add(part);
135-
}
136-
}
137-
catch (Exception e) {
138-
release();
143+
for (int i = 0; i < parts.length; i++)
144+
reservedParts.add(top.localPartition(parts[i]));
139145

140-
throw e;
146+
this.reservedParts = reservedParts;
141147
}
142148
finally {
143-
this.reserved = reserved;
144-
145149
top.readUnlock();
146150
}
147151
}
148152

149153
/** */
150154
private synchronized void release() {
151-
if (F.isEmpty(reserved))
152-
return;
153-
154-
reserved.forEach(GridDhtLocalPartition::release);
155+
if (reservation != null)
156+
reservation.release();
155157

156-
reserved = null;
158+
reservation = null;
157159
}
158160
}

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public IndexScan(
116116

117117
txChanges = ectx.transactionChanges(
118118
cctx.cacheId(),
119-
parts,
119+
cctx.isReplicated() ? null : this.parts,
120120
r -> new IndexRowImpl(rowHnd, r),
121121
this::compare
122122
);

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,9 @@ private class IteratorImpl extends GridIteratorAdapter<Row> {
7474

7575
/** */
7676
private IteratorImpl() {
77-
assert reserved != null;
77+
assert reservedParts != null;
7878

79-
parts = new ArrayDeque<>(reserved);
79+
parts = new ArrayDeque<>(reservedParts);
8080

8181
txChanges = F.isEmpty(ectx.getQryTxEntries())
8282
? TransactionChanges.empty()

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,13 @@ public class ColocationGroup implements MarshalableMessage {
5858
@GridDirectTransient
5959
private List<List<UUID>> assignments;
6060

61+
/**
62+
* Flag, indacating that assignment is formed by original cache assignment for given topology.
63+
* In case of {@code true} value we can skip assignment marshalling and calc assignment on remote nodes.
64+
*/
65+
@GridDirectTransient
66+
private boolean primaryAssignment;
67+
6168
/** Marshalled assignments. */
6269
private int[] marshalledAssignments;
6370

@@ -68,7 +75,7 @@ public static ColocationGroup forNodes(List<UUID> nodeIds) {
6875

6976
/** */
7077
public static ColocationGroup forAssignments(List<List<UUID>> assignments) {
71-
return new ColocationGroup(null, null, assignments);
78+
return new ColocationGroup(null, null, assignments, true);
7279
}
7380

7481
/** */
@@ -100,6 +107,13 @@ private ColocationGroup(long[] sourceIds, List<UUID> nodeIds, List<List<UUID>> a
100107
this.assignments = assignments;
101108
}
102109

110+
/** */
111+
private ColocationGroup(long[] sourceIds, List<UUID> nodeIds, List<List<UUID>> assignments, boolean primaryAssignment) {
112+
this(sourceIds, nodeIds, assignments);
113+
114+
this.primaryAssignment = primaryAssignment;
115+
}
116+
103117
/**
104118
* @return Lists of nodes capable to execute a query fragment for what the mapping is calculated.
105119
*/
@@ -143,10 +157,10 @@ public boolean belongs(long sourceId) {
143157
*/
144158
public ColocationGroup colocate(ColocationGroup other) throws ColocationMappingException {
145159
long[] srcIds;
146-
if (this.sourceIds == null || other.sourceIds == null)
147-
srcIds = U.firstNotNull(this.sourceIds, other.sourceIds);
160+
if (sourceIds == null || other.sourceIds == null)
161+
srcIds = U.firstNotNull(sourceIds, other.sourceIds);
148162
else
149-
srcIds = LongStream.concat(Arrays.stream(this.sourceIds), Arrays.stream(other.sourceIds)).distinct().toArray();
163+
srcIds = LongStream.concat(Arrays.stream(sourceIds), Arrays.stream(other.sourceIds)).distinct().toArray();
150164

151165
List<UUID> nodeIds;
152166
if (this.nodeIds == null || other.nodeIds == null)
@@ -159,6 +173,8 @@ public ColocationGroup colocate(ColocationGroup other) throws ColocationMappingE
159173
"Replicated query parts are not co-located on all nodes");
160174
}
161175

176+
boolean primaryAssignment = this.primaryAssignment || other.primaryAssignment;
177+
162178
List<List<UUID>> assignments;
163179
if (this.assignments == null || other.assignments == null) {
164180
assignments = U.firstNotNull(this.assignments, other.assignments);
@@ -170,11 +186,14 @@ public ColocationGroup colocate(ColocationGroup other) throws ColocationMappingE
170186
for (int i = 0; i < assignments.size(); i++) {
171187
List<UUID> assignment = Commons.intersect(filter, assignments.get(i));
172188

173-
if (assignment.isEmpty()) { // TODO check with partition filters
189+
if (assignment.isEmpty()) {
174190
throw new ColocationMappingException("Failed to map fragment to location. " +
175191
"Partition mapping is empty [part=" + i + "]");
176192
}
177193

194+
if (!assignment.get(0).equals(assignments.get(i).get(0)))
195+
primaryAssignment = false;
196+
178197
assignments0.add(assignment);
179198
}
180199

@@ -191,14 +210,20 @@ public ColocationGroup colocate(ColocationGroup other) throws ColocationMappingE
191210
if (filter != null)
192211
assignment.retainAll(filter);
193212

194-
if (assignment.isEmpty()) // TODO check with partition filters
195-
throw new ColocationMappingException("Failed to map fragment to location. Partition mapping is empty [part=" + i + "]");
213+
if (assignment.isEmpty()) {
214+
throw new ColocationMappingException("Failed to map fragment to location. " +
215+
"Partition mapping is empty [part=" + i + "]");
216+
}
217+
218+
if (!assignment.get(0).equals(this.assignments.get(i).get(0))
219+
|| !assignment.get(0).equals(other.assignments.get(i).get(0)))
220+
primaryAssignment = false;
196221

197222
assignments.add(assignment);
198223
}
199224
}
200225

201-
return new ColocationGroup(srcIds, nodeIds, assignments);
226+
return new ColocationGroup(srcIds, nodeIds, assignments, primaryAssignment);
202227
}
203228

204229
/** */
@@ -216,7 +241,16 @@ public ColocationGroup finalizeMapping() {
216241
assignments.add(first != null ? Collections.singletonList(first) : Collections.emptyList());
217242
}
218243

219-
return new ColocationGroup(sourceIds, new ArrayList<>(nodes), assignments);
244+
return new ColocationGroup(sourceIds, new ArrayList<>(nodes), assignments, primaryAssignment);
245+
}
246+
247+
/** */
248+
public ColocationGroup explicitMapping() {
249+
if (assignments == null || !primaryAssignment)
250+
return this;
251+
252+
// Make a shallow copy without cacheAssignment flag.
253+
return new ColocationGroup(sourceIds, nodeIds, assignments, false);
220254
}
221255

222256
/** */
@@ -359,7 +393,7 @@ public int[] partitions(UUID nodeId) {
359393

360394
/** {@inheritDoc} */
361395
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) {
362-
if (assignments != null && marshalledAssignments == null) {
396+
if (assignments != null && marshalledAssignments == null && !primaryAssignment) {
363397
Map<UUID, Integer> nodeIdxs = new HashMap<>();
364398

365399
for (int i = 0; i < nodeIds.size(); i++)

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,11 @@ public FragmentMapping mapping() {
192192
if (mapping != null)
193193
mapping.prepareMarshal(ctx);
194194

195-
if (target != null)
195+
if (target != null) {
196+
target = target.explicitMapping();
197+
196198
target.prepareMarshal(ctx);
199+
}
197200

198201
if (remoteSources0 == null && remoteSources != null) {
199202
remoteSources0 = U.newHashMap(remoteSources.size());

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -599,18 +599,25 @@ private ColocationGroup replicatedGroup(@NotNull AffinityTopologyVersion topVer)
599599
List<ClusterNode> nodes = cctx.discovery().discoCache(topVer).cacheGroupAffinityNodes(cctx.groupId());
600600
List<UUID> nodes0;
601601

602-
if (!top.rebalanceFinished(topVer)) {
603-
nodes0 = new ArrayList<>(nodes.size());
602+
top.readLock();
604603

605-
int parts = top.partitions();
604+
try {
605+
if (!top.rebalanceFinished(topVer)) {
606+
nodes0 = new ArrayList<>(nodes.size());
607+
608+
int parts = top.partitions();
606609

607-
for (ClusterNode node : nodes) {
608-
if (isOwner(node.id(), top, parts))
609-
nodes0.add(node.id());
610+
for (ClusterNode node : nodes) {
611+
if (isOwner(node.id(), top, parts))
612+
nodes0.add(node.id());
613+
}
610614
}
615+
else
616+
nodes0 = Commons.transform(nodes, ClusterNode::id);
617+
}
618+
finally {
619+
top.readUnlock();
611620
}
612-
else
613-
nodes0 = Commons.transform(nodes, ClusterNode::id);
614621

615622
return ColocationGroup.forNodes(nodes0);
616623
}

modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservation.java renamed to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.ignite.internal.processors.query.h2.twostep;
18+
package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
1919

2020
import java.util.List;
2121
import java.util.concurrent.atomic.AtomicBoolean;

modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationKey.java renamed to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionReservationKey.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.ignite.internal.processors.query.h2.twostep;
18+
package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
1919

2020
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
2121
import org.apache.ignite.internal.util.typedef.F;

0 commit comments

Comments
 (0)