Skip to content

Commit d385d78

Browse files
IGNITE-27430 SQL Calcite: Optimize IndexScan node creation - Fixes #12599.
Signed-off-by: Aleksey Plekhanov <[email protected]>
1 parent 80a9fb1 commit d385d78

File tree

11 files changed

+127
-60
lines changed

11 files changed

+127
-60
lines changed

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,18 @@
1717

1818
package org.apache.ignite.internal.processors.query.calcite.exec;
1919

20-
import java.util.ArrayList;
20+
import java.util.BitSet;
2121
import java.util.Collection;
2222
import java.util.Iterator;
23-
import java.util.List;
24-
import java.util.stream.IntStream;
2523
import org.apache.ignite.IgniteCheckedException;
2624
import org.apache.ignite.cluster.ClusterTopologyException;
2725
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
2826
import org.apache.ignite.internal.processors.cache.GridCacheContext;
2927
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
30-
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
3128
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
3229
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionReservation;
30+
import org.apache.ignite.internal.util.collection.IntSet;
31+
import org.jetbrains.annotations.Nullable;
3332

3433
/** */
3534
public abstract class AbstractCacheScan<Row> implements Iterable<Row>, AutoCloseable {
@@ -43,36 +42,47 @@ public abstract class AbstractCacheScan<Row> implements Iterable<Row>, AutoClose
4342
protected final AffinityTopologyVersion topVer;
4443

4544
/** */
46-
protected final int[] parts;
45+
protected final BitSet parts;
4746

4847
/** */
49-
protected final boolean explicitParts;
48+
private final @Nullable int[] explicitParts;
5049

5150
/** */
5251
private PartitionReservation reservation;
5352

5453
/** */
55-
protected volatile List<GridDhtLocalPartition> reservedParts;
56-
57-
/** */
58-
AbstractCacheScan(ExecutionContext<Row> ectx, GridCacheContext<?, ?> cctx, int[] parts) {
54+
AbstractCacheScan(ExecutionContext<Row> ectx, GridCacheContext<?, ?> cctx, @Nullable int[] explicitParts) {
5955
this.ectx = ectx;
6056
this.cctx = cctx;
6157

6258
topVer = ectx.topologyVersion();
6359

64-
explicitParts = parts != null;
60+
this.explicitParts = explicitParts;
61+
62+
int partsCnt = cctx.affinity().partitions();
6563

66-
if (cctx.isReplicated())
67-
this.parts = IntStream.range(0, cctx.affinity().partitions()).toArray();
64+
if (cctx.isReplicated()) {
65+
parts = new BitSet(partsCnt);
66+
parts.set(0, partsCnt);
67+
}
6868
else {
69-
if (parts != null)
70-
this.parts = parts;
69+
if (explicitParts != null) {
70+
parts = new BitSet(partsCnt);
71+
72+
for (int i = 0; i < explicitParts.length; i++)
73+
parts.set(explicitParts[i]);
74+
}
7175
else {
7276
Collection<Integer> primaryParts = cctx.affinity().primaryPartitions(
7377
cctx.kernalContext().localNodeId(), topVer);
7478

75-
this.parts = primaryParts.stream().mapToInt(Integer::intValue).toArray();
79+
if (primaryParts instanceof IntSet)
80+
parts = ((IntSet)primaryParts).toBitSet();
81+
else {
82+
parts = new BitSet(partsCnt);
83+
84+
primaryParts.forEach(parts::set);
85+
}
7686
}
7787
}
7888
}
@@ -124,7 +134,7 @@ private synchronized void reserve() {
124134

125135
try {
126136
reservation = cctx.kernalContext().query().partitionReservationManager().reservePartitions(
127-
cctx, topVer, explicitParts ? parts : null, ectx.originatingNodeId(), "qryId=" + ectx.queryId());
137+
cctx, topVer, explicitParts, ectx.originatingNodeId(), "qryId=" + ectx.queryId());
128138
}
129139
catch (IgniteCheckedException e) {
130140
throw new ClusterTopologyException("Failed to reserve partition for query execution", e);
@@ -138,18 +148,18 @@ private synchronized void reserve() {
138148

139149
this.reservation = reservation;
140150

141-
List<GridDhtLocalPartition> reservedParts = new ArrayList<>(parts.length);
142-
143-
for (int i = 0; i < parts.length; i++)
144-
reservedParts.add(top.localPartition(parts[i]));
145-
146-
this.reservedParts = reservedParts;
151+
processReservedTopology(top);
147152
}
148153
finally {
149154
top.readUnlock();
150155
}
151156
}
152157

158+
/** */
159+
protected void processReservedTopology(GridDhtPartitionTopology top) {
160+
// No-op.
161+
}
162+
153163
/** */
154164
private synchronized void release() {
155165
if (reservation != null)

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import java.lang.reflect.Type;
2121
import java.util.ArrayList;
22-
import java.util.Arrays;
22+
import java.util.BitSet;
2323
import java.util.Collection;
2424
import java.util.Comparator;
2525
import java.util.HashSet;
@@ -369,17 +369,13 @@ public static Collection<QueryTxEntry> transactionChanges(
369369
*/
370370
public <R> TransactionChanges<R> transactionChanges(
371371
int cacheId,
372-
int[] parts,
372+
BitSet parts,
373373
Function<CacheDataRow, R> mapper,
374374
@Nullable Comparator<R> cmp
375375
) {
376376
if (F.isEmpty(qryTxEntries))
377377
return TransactionChanges.empty();
378378

379-
// Expecting parts are sorted or almost sorted and amount of transaction entries are relatively small.
380-
if (parts != null && !F.isSorted(parts))
381-
Arrays.sort(parts);
382-
383379
Set<KeyCacheObject> changedKeys = new HashSet<>(qryTxEntries.size());
384380
List<R> newAndUpdatedRows = new ArrayList<>(qryTxEntries.size());
385381

@@ -391,7 +387,7 @@ public <R> TransactionChanges<R> transactionChanges(
391387
if (e.cacheId() != cacheId)
392388
continue;
393389

394-
if (parts != null && Arrays.binarySearch(parts, part) < 0)
390+
if (parts != null && !parts.get(part))
395391
continue;
396392

397393
changedKeys.add(e.key());

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ private boolean hasExchange(RelNode rel) {
368368
ImmutableBitSet requiredColumns = rel.requiredColumns();
369369
List<SearchBounds> searchBounds = rel.searchBounds();
370370

371-
RelDataType rowType = tbl.getRowType(typeFactory, requiredColumns);
371+
RelDataType rowType = rel.getDataSourceRowType();
372372

373373
Predicate<Row> filters = condition == null ? null : expressionFactory.predicate(condition, rowType);
374374
Function<Row, Row> prj = projects == null ? null : expressionFactory.project(projects, rowType);
@@ -546,9 +546,8 @@ private boolean hasExchange(RelNode rel) {
546546
ImmutableBitSet requiredColumns = rel.requiredColumns();
547547

548548
IgniteTable tbl = rel.getTable().unwrap(IgniteTable.class);
549-
IgniteTypeFactory typeFactory = ctx.getTypeFactory();
550549

551-
RelDataType rowType = tbl.getRowType(typeFactory, requiredColumns);
550+
RelDataType rowType = rel.getDataSourceRowType();
552551

553552
Predicate<Row> filters = condition == null ? null : expressionFactory.predicate(condition, rowType);
554553
Function<Row, Row> prj = projects == null ? null : expressionFactory.project(projects, rowType);

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TableScan.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,17 @@
1818
package org.apache.ignite.internal.processors.query.calcite.exec;
1919

2020
import java.util.ArrayDeque;
21+
import java.util.ArrayList;
2122
import java.util.Collections;
2223
import java.util.Iterator;
24+
import java.util.List;
2325
import java.util.NoSuchElementException;
2426
import java.util.Queue;
2527
import java.util.function.Function;
2628
import org.apache.calcite.util.ImmutableBitSet;
2729
import org.apache.ignite.IgniteCheckedException;
2830
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
31+
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
2932
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
3033
import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
3134
import org.apache.ignite.internal.processors.cache.transactions.TransactionChanges;
@@ -38,6 +41,9 @@
3841

3942
/** */
4043
public class TableScan<Row> extends AbstractCacheColumnsScan<Row> {
44+
/** */
45+
protected volatile List<GridDhtLocalPartition> reservedParts;
46+
4147
/** */
4248
public TableScan(
4349
ExecutionContext<Row> ectx,
@@ -53,6 +59,16 @@ public TableScan(
5359
return new IteratorImpl();
5460
}
5561

62+
/** {@inheritDoc} */
63+
@Override protected void processReservedTopology(GridDhtPartitionTopology top) {
64+
List<GridDhtLocalPartition> reservedParts = new ArrayList<>(parts.cardinality());
65+
66+
for (int part = parts.nextSetBit(0); part >= 0; part = parts.nextSetBit(part + 1))
67+
reservedParts.add(top.localPartition(part));
68+
69+
this.reservedParts = reservedParts;
70+
}
71+
5672
/**
5773
* Table scan iterator.
5874
*/

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/ExpressionFactoryImpl.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -105,13 +105,13 @@ public class ExpressionFactoryImpl<Row> implements ExpressionFactory<Row> {
105105
private final RexBuilder rexBuilder;
106106

107107
/** */
108-
private final RelDataType emptyType;
108+
private static final RelDataType EMPTY_TYPE = new RelDataTypeFactory.Builder(Commons.typeFactory()).build();
109109

110110
/** */
111-
private final RelDataType nullType;
111+
private static final RelDataType NULL_TYPE = Commons.typeFactory().createSqlType(SqlTypeName.NULL);
112112

113113
/** */
114-
private final RelDataType booleanType;
114+
private static final RelDataType BOOLEAN_TYPE = Commons.typeFactory().createJavaType(Boolean.class);
115115

116116
/** */
117117
private final ExecutionContext<Row> ctx;
@@ -127,10 +127,6 @@ public ExpressionFactoryImpl(
127127
this.typeFactory = typeFactory;
128128
this.conformance = conformance;
129129
this.rexBuilder = rexBuilder;
130-
131-
emptyType = new RelDataTypeFactory.Builder(this.typeFactory).build();
132-
nullType = typeFactory.createSqlType(SqlTypeName.NULL);
133-
booleanType = typeFactory.createJavaType(Boolean.class);
134130
}
135131

136132
/** {@inheritDoc} */
@@ -296,7 +292,7 @@ else if (o2 == null)
296292
/** {@inheritDoc} */
297293
@Override public Supplier<Row> rowSource(List<RexNode> values) {
298294
return new ValuesImpl(scalar(values, null), ctx.rowHandler().factory(typeFactory,
299-
Commons.transform(values, v -> v != null ? v.getType() : nullType)));
295+
Commons.transform(values, v -> v != null ? v.getType() : NULL_TYPE)));
300296
}
301297

302298
/** {@inheritDoc} */
@@ -494,7 +490,7 @@ private BiScalar biScalar(RexNode node, RelDataType type) {
494490
/** */
495491
private Scalar compile(List<RexNode> nodes, RelDataType type, boolean biInParams) {
496492
if (type == null)
497-
type = emptyType;
493+
type = EMPTY_TYPE;
498494

499495
RexProgramBuilder programBuilder = new RexProgramBuilder(type, rexBuilder);
500496

@@ -508,8 +504,8 @@ private Scalar compile(List<RexNode> nodes, RelDataType type, boolean biInParams
508504
else {
509505
unspecifiedValues.set(i);
510506

511-
programBuilder.addProject(rexBuilder.makeNullLiteral(type == emptyType ?
512-
nullType : type.getFieldList().get(i).getType()), null);
507+
programBuilder.addProject(rexBuilder.makeNullLiteral(type == EMPTY_TYPE ?
508+
NULL_TYPE : type.getFieldList().get(i).getType()), null);
513509
}
514510
}
515511

@@ -637,7 +633,7 @@ private abstract class AbstractScalarPredicate<T extends Scalar> {
637633
private AbstractScalarPredicate(T scalar) {
638634
this.scalar = scalar;
639635
hnd = ctx.rowHandler();
640-
out = hnd.factory(typeFactory, booleanType).create();
636+
out = hnd.factory(typeFactory, BOOLEAN_TYPE).create();
641637
}
642638
}
643639

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/ProjectableFilterableTableScan.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ public abstract class ProjectableFilterableTableScan extends TableScan {
6363
/** Participating columns. */
6464
protected final ImmutableBitSet requiredColumns;
6565

66+
/** Required columns from table row type (No need to be serialized, for caching only). */
67+
protected RelDataType dataSourceRowType;
68+
6669
/** */
6770
protected ProjectableFilterableTableScan(
6871
RelOptCluster cluster,
@@ -151,6 +154,16 @@ protected RelWriter explainTerms0(RelWriter pw) {
151154
return table.unwrap(IgniteTable.class).getRowType(Commons.typeFactory(getCluster()), requiredColumns);
152155
}
153156

157+
/** */
158+
public RelDataType getDataSourceRowType() {
159+
if (dataSourceRowType == null) {
160+
dataSourceRowType = table.unwrap(IgniteTable.class).getRowType(Commons.typeFactory(getCluster()),
161+
requiredColumns);
162+
}
163+
164+
return dataSourceRowType;
165+
}
166+
154167
/** */
155168
public RexNode pushUpPredicate() {
156169
if (condition == null || projects == null)

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,9 @@ public class CacheTableDescriptorImpl extends NullInitializerExpressionFactory
112112
/** */
113113
private final ImmutableBitSet insertFields;
114114

115+
/** */
116+
private RelDataType tableRowType;
117+
115118
/** */
116119
public CacheTableDescriptorImpl(GridCacheContextInfo<?, ?> cacheInfo, GridQueryTypeDescriptor typeDesc,
117120
Object affinityIdentity) {
@@ -490,18 +493,23 @@ private <Row> ModifyTuple deleteTuple(Row row, ExecutionContext<Row> ectx) {
490493

491494
/** {@inheritDoc} */
492495
@Override public RelDataType rowType(IgniteTypeFactory factory, ImmutableBitSet usedColumns) {
496+
if (usedColumns == null && tableRowType != null)
497+
return tableRowType;
498+
493499
RelDataTypeFactory.Builder b = new RelDataTypeFactory.Builder(factory);
494500

495501
if (usedColumns == null) {
496502
for (int i = 0; i < descriptors.length; i++)
497503
b.add(descriptors[i].name(), descriptors[i].logicalType(factory));
504+
505+
return tableRowType = b.build();
498506
}
499507
else {
500508
for (int i = usedColumns.nextSetBit(0); i != -1; i = usedColumns.nextSetBit(i + 1))
501509
b.add(descriptors[i].name(), descriptors[i].logicalType(factory));
502-
}
503510

504-
return b.build();
511+
return b.build();
512+
}
505513
}
506514

507515
/** {@inheritDoc} */

modules/core/src/main/java/org/apache/ignite/internal/util/collection/BitSetIntSet.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,11 @@ public BitSetIntSet(int initCap, Collection<Integer> coll) {
177177
return arr;
178178
}
179179

180+
/** {@inheritDoc} */
181+
@Override public BitSet toBitSet() {
182+
return (BitSet)bitSet.clone();
183+
}
184+
180185
/** {@inheritDoc} */
181186
@Override public boolean containsAll(@NotNull Collection<?> c) {
182187
for (Object o : c) {

modules/core/src/main/java/org/apache/ignite/internal/util/collection/ImmutableIntSet.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.ignite.internal.util.collection;
2020

21+
import java.util.BitSet;
2122
import java.util.Collection;
2223
import java.util.Iterator;
2324
import java.util.Set;
@@ -88,6 +89,19 @@ public ImmutableIntSet(Set<Integer> delegate) {
8889
return delegate.toArray();
8990
}
9091

92+
/** {@inheritDoc} */
93+
@Override public BitSet toBitSet() {
94+
if (delegate instanceof IntSet)
95+
return ((IntSet)delegate).toBitSet();
96+
else {
97+
BitSet bitSet = new BitSet();
98+
99+
forEach(bitSet::set);
100+
101+
return bitSet;
102+
}
103+
}
104+
91105
/** {@inheritDoc} */
92106
@NotNull @Override public <T> T[] toArray(@NotNull T[] a) {
93107
return delegate.toArray(a);

0 commit comments

Comments
 (0)