Skip to content

Commit be1f4bc

Browse files
IGNITE-24719 SQL Calcite: Fix mapping sending for trim exchange - Fixes #11912.
Signed-off-by: Aleksey Plekhanov <plehanov.alex@gmail.com>
1 parent c743f41 commit be1f4bc

File tree

4 files changed

+49
-5
lines changed

4 files changed

+49
-5
lines changed

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.nio.ByteBuffer;
2121
import java.util.Collections;
2222
import java.util.List;
23+
import java.util.Set;
2324
import java.util.UUID;
2425
import java.util.function.Supplier;
2526
import java.util.stream.Collectors;
@@ -167,6 +168,16 @@ else if (grps.size() > 1)
167168
return F.first(grps);
168169
}
169170

171+
/** Create fragment mapping with explicit mapping for groups by source ids. */
172+
public FragmentMapping explicitMapping(Set<Long> srcIds) {
173+
Set<ColocationGroup> explicitMappingGrps = U.newIdentityHashSet();
174+
175+
srcIds.forEach(srcId -> explicitMappingGrps.add(findGroup(srcId)));
176+
177+
return new FragmentMapping(Commons.transform(colocationGroups,
178+
g -> explicitMappingGrps.contains(g) ? g.explicitMapping() : g));
179+
}
180+
170181
/** {@inheritDoc} */
171182
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) {
172183
colocationGroups.forEach(g -> g.prepareMarshal(ctx));

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package org.apache.ignite.internal.processors.query.calcite.prepare;
1919

20+
import java.util.HashSet;
2021
import java.util.List;
22+
import java.util.Set;
2123
import java.util.UUID;
2224
import java.util.concurrent.ThreadLocalRandom;
2325
import java.util.function.Supplier;
@@ -34,6 +36,7 @@
3436
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
3537
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
3638
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
39+
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange;
3740
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
3841
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
3942
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -153,6 +156,22 @@ private FragmentMapping mapping(MappingQueryContext ctx, RelMetadataQuery mq, Su
153156
else
154157
mapping = FragmentMapping.create(ctx.localNodeId()).colocate(mapping);
155158
}
159+
else {
160+
// For remote fragments we should ensure that mapping is explicitely sent to remote nodes if
161+
// it's required by trim exchange.
162+
Set<Long> trimSrcIds = new HashSet<>();
163+
164+
root.accept(new IgniteRelShuttle() {
165+
@Override public IgniteRel visit(IgniteTrimExchange rel) {
166+
trimSrcIds.add(rel.sourceId());
167+
168+
return super.visit(rel);
169+
}
170+
});
171+
172+
if (!trimSrcIds.isEmpty())
173+
mapping = mapping.explicitMapping(trimSrcIds);
174+
}
156175

157176
if (single() && mapping.nodeIds().size() > 1) {
158177
// this is possible when the fragment contains scan of a replicated cache, which brings

modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/JoinRehashIntegrationTest.java renamed to modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/DistributedJoinIntegrationTest.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,15 @@
2323
import org.junit.Test;
2424

2525
/** */
26-
public class JoinRehashIntegrationTest extends AbstractBasicIntegrationTransactionalTest {
26+
public class DistributedJoinIntegrationTest extends AbstractBasicIntegrationTransactionalTest {
2727
/** {@inheritDoc} */
2828
@Override protected int nodeCount() {
2929
return 3;
3030
}
3131

3232
/** Test that resources (in particular inboxes) are cleaned up after executing join with rehashing. */
3333
@Test
34-
public void testResourceCleanup() throws Exception {
34+
public void testRehashResourceCleanup() throws Exception {
3535
prepareTables();
3636

3737
String sql = "SELECT sum(i.price * i.amount)" +
@@ -55,7 +55,7 @@ public void testResourceCleanup() throws Exception {
5555

5656
/** Tests that null values are filtered out on rehashing. */
5757
@Test
58-
public void testNullAffinityKeys() {
58+
public void testRehashNullAffinityKeys() {
5959
prepareTables();
6060

6161
// Add null values.
@@ -86,6 +86,20 @@ public void testRehashOnRightHand() {
8686
.check();
8787
}
8888

89+
/** */
90+
@Test
91+
public void testTrimExchange() {
92+
sql("CREATE TABLE order_ids(id INTEGER PRIMARY KEY) WITH TEMPLATE=REPLICATED," + atomicity());
93+
prepareTables();
94+
95+
sql("INSERT INTO order_ids(id) SELECT id FROM orders");
96+
97+
assertQuery("SELECT sum(o.id) FROM orders o JOIN order_ids oid ON (o.id = oid.id)")
98+
.matches(QueryChecker.containsSubPlan("IgniteTrimExchange"))
99+
.returns(435L)
100+
.check();
101+
}
102+
89103
/** Prepare tables orders and order_items with data. */
90104
private void prepareTables() {
91105
sql("CREATE TABLE items (\n" +

modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.ignite.internal.processors.query.calcite.integration.CorrelatesIntegrationTest;
3131
import org.apache.ignite.internal.processors.query.calcite.integration.DataTypesTest;
3232
import org.apache.ignite.internal.processors.query.calcite.integration.DateTimeTest;
33+
import org.apache.ignite.internal.processors.query.calcite.integration.DistributedJoinIntegrationTest;
3334
import org.apache.ignite.internal.processors.query.calcite.integration.DynamicParametersIntegrationTest;
3435
import org.apache.ignite.internal.processors.query.calcite.integration.ExpiredEntriesIntegrationTest;
3536
import org.apache.ignite.internal.processors.query.calcite.integration.FunctionsTest;
@@ -42,7 +43,6 @@
4243
import org.apache.ignite.internal.processors.query.calcite.integration.IndexSpoolIntegrationTest;
4344
import org.apache.ignite.internal.processors.query.calcite.integration.IntervalTest;
4445
import org.apache.ignite.internal.processors.query.calcite.integration.JoinIntegrationTest;
45-
import org.apache.ignite.internal.processors.query.calcite.integration.JoinRehashIntegrationTest;
4646
import org.apache.ignite.internal.processors.query.calcite.integration.KeepBinaryIntegrationTest;
4747
import org.apache.ignite.internal.processors.query.calcite.integration.KillCommandDdlIntegrationTest;
4848
import org.apache.ignite.internal.processors.query.calcite.integration.KillQueryCommandDdlIntegrationTest;
@@ -145,7 +145,7 @@
145145
ExpiredEntriesIntegrationTest.class,
146146
TimeoutIntegrationTest.class,
147147
PartitionPruneTest.class,
148-
JoinRehashIntegrationTest.class,
148+
DistributedJoinIntegrationTest.class,
149149
IndexWithSameNameCalciteTest.class,
150150
AuthorizationIntegrationTest.class,
151151
DdlTransactionCalciteSelfTest.class,

0 commit comments

Comments
 (0)