Skip to content

Commit 41f0eeb

Browse files
IGNITE-26214 SQL Calcite: Optimization for MultiDC - Fixes #12512.
Signed-off-by: Aleksey Plekhanov <[email protected]>
1 parent b20fe87 commit 41f0eeb

File tree

4 files changed

+239
-12
lines changed

4 files changed

+239
-12
lines changed

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,14 @@ public static ColocationGroup forNodes(List<UUID> nodeIds) {
7070

7171
/** */
7272
public static ColocationGroup forAssignments(List<List<UUID>> assignments) {
73+
return new ColocationGroup(null, null, assignments, false);
74+
}
75+
76+
/**
77+
* Creates colocation group with assignments equal to cache assignments (i.e. cache assignments on remote nodes
78+
* can be used for the same topology).
79+
*/
80+
public static ColocationGroup forCacheAssignments(List<List<UUID>> assignments) {
7381
return new ColocationGroup(null, null, assignments, true);
7482
}
7583

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableDescriptorImpl.java

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.BitSet;
2323
import java.util.Collection;
2424
import java.util.Collections;
25+
import java.util.HashSet;
2526
import java.util.List;
2627
import java.util.Map;
2728
import java.util.Objects;
@@ -547,7 +548,28 @@ private ColocationGroup partitionedGroup(@NotNull AffinityTopologyVersion topVer
547548
assignments0.add(F.isEmpty(partNodes) ? emptyList() : singletonList(F.first(partNodes).id()));
548549
}
549550

550-
return ColocationGroup.forAssignments(assignments0);
551+
String dcId = cacheContext().kernalContext().discovery().localNode().dataCenterId();
552+
Collection<UUID> sameDcNodeIds = dcId == null ? null : new HashSet<>(F.viewReadOnly(
553+
cctx.kernalContext().discovery().aliveServerNodes(),
554+
ClusterNode::id, n -> Objects.equals(n.dataCenterId(), dcId)));
555+
556+
if (dcId != null) {
557+
List<List<UUID>> curDcAssignments = new ArrayList<>(assignments0.size());
558+
559+
for (List<UUID> assignment : assignments0) {
560+
List<UUID> curDcAssignment = U.arrayList(assignment, sameDcNodeIds::contains);
561+
562+
// If any assignment become empty after filtration by DC, return original assignments.
563+
if (F.isEmpty(curDcAssignment) && !F.isEmpty(assignment))
564+
return ColocationGroup.forCacheAssignments(assignments0);
565+
566+
curDcAssignments.add(curDcAssignment);
567+
}
568+
569+
return ColocationGroup.forAssignments(curDcAssignments);
570+
}
571+
572+
return ColocationGroup.forCacheAssignments(assignments0);
551573
}
552574

553575
/** */
@@ -557,29 +579,32 @@ private ColocationGroup replicatedGroup(@NotNull AffinityTopologyVersion topVer)
557579
GridDhtPartitionTopology top = cctx.topology();
558580

559581
List<ClusterNode> nodes = cctx.discovery().discoCache(topVer).cacheGroupAffinityNodes(cctx.groupId());
560-
List<UUID> nodes0;
582+
List<UUID> nodeIds;
561583

562584
top.readLock();
563585

564586
try {
565-
if (!top.rebalanceFinished(topVer)) {
566-
nodes0 = new ArrayList<>(nodes.size());
587+
int parts = top.partitions();
567588

568-
int parts = top.partitions();
589+
List<ClusterNode> nodes0 = top.rebalanceFinished(topVer) ? nodes :
590+
U.arrayList(nodes, node -> isOwner(node.id(), top, parts));
569591

570-
for (ClusterNode node : nodes) {
571-
if (isOwner(node.id(), top, parts))
572-
nodes0.add(node.id());
573-
}
592+
String dcId = cacheContext().kernalContext().discovery().localNode().dataCenterId();
593+
594+
if (dcId != null) {
595+
List<ClusterNode> curDcNodes = U.arrayList(nodes0, node -> dcId.equals(node.dataCenterId()));
596+
597+
if (!F.isEmpty(curDcNodes))
598+
nodes0 = curDcNodes;
574599
}
575-
else
576-
nodes0 = Commons.transform(nodes, ClusterNode::id);
600+
601+
nodeIds = Commons.transform(nodes0, ClusterNode::id);
577602
}
578603
finally {
579604
top.readUnlock();
580605
}
581606

582-
return ColocationGroup.forNodes(nodes0);
607+
return ColocationGroup.forNodes(nodeIds);
583608
}
584609

585610
/** */
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.processors.query.calcite.integration;
19+
20+
import java.util.Collections;
21+
import java.util.List;
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
import org.apache.ignite.IgniteException;
24+
import org.apache.ignite.IgniteSystemProperties;
25+
import org.apache.ignite.cache.CacheMode;
26+
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
27+
import org.apache.ignite.cache.QueryEntity;
28+
import org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeAffinityBackupFilter;
29+
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
30+
import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
31+
import org.apache.ignite.cluster.ClusterNode;
32+
import org.apache.ignite.configuration.CacheConfiguration;
33+
import org.apache.ignite.configuration.IgniteConfiguration;
34+
import org.apache.ignite.internal.IgniteEx;
35+
import org.apache.ignite.internal.managers.communication.GridIoMessage;
36+
import org.apache.ignite.internal.processors.query.QueryUtils;
37+
import org.apache.ignite.internal.processors.query.calcite.message.QueryStartRequest;
38+
import org.apache.ignite.internal.util.typedef.F;
39+
import org.apache.ignite.lang.IgniteInClosure;
40+
import org.apache.ignite.plugin.extensions.communication.Message;
41+
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
42+
import org.junit.Test;
43+
44+
/** */
45+
public class MultiDcQueryMappingTest extends AbstractBasicIntegrationTest {
46+
/** */
47+
private static final String DC1 = "DC1";
48+
49+
/** */
50+
private static final String DC2 = "DC2";
51+
52+
/** */
53+
private static final int ROWS_CNT = 100;
54+
55+
/** */
56+
private String dcId;
57+
58+
/** */
59+
AtomicBoolean crossDcRequest = new AtomicBoolean();
60+
61+
/** {@inheritDoc} */
62+
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
63+
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
64+
65+
cfg.getSqlConfiguration().setQueryEnginesConfiguration(new CalciteQueryEngineConfiguration());
66+
67+
cfg.setCommunicationSpi(new TcpCommunicationSpi() {
68+
/** {@inheritDoc} */
69+
@Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) {
70+
assert msg != null;
71+
72+
if (msg instanceof GridIoMessage) {
73+
GridIoMessage msg0 = (GridIoMessage)msg;
74+
75+
if (msg0.message() instanceof QueryStartRequest) {
76+
if (!ignite().cluster().localNode().dataCenterId().equals(node.dataCenterId()))
77+
crossDcRequest.set(true);
78+
}
79+
}
80+
81+
super.sendMessage(node, msg, ackC);
82+
}
83+
});
84+
85+
cfg.setUserAttributes(F.asMap(IgniteSystemProperties.IGNITE_DATA_CENTER_ID, dcId));
86+
87+
// Partitioned cache, in both data centers.
88+
CacheConfiguration<Integer, Employer> ccfgPart2dc = cacheConfiguration("part2dc")
89+
.setCacheMode(CacheMode.PARTITIONED)
90+
.setBackups(1)
91+
.setAffinity(new RendezvousAffinityFunction().setAffinityBackupFilter(
92+
new ClusterNodeAttributeAffinityBackupFilter(IgniteSystemProperties.IGNITE_DATA_CENTER_ID)));
93+
94+
// Replicated cache, in both data centers.
95+
CacheConfiguration<Integer, Employer> ccfgRepl2dc = cacheConfiguration("repl2dc")
96+
.setCacheMode(CacheMode.REPLICATED);
97+
98+
// Partitioned cache, in one data center.
99+
CacheConfiguration<Integer, Employer> ccfgPart1dc = cacheConfiguration("part1dc")
100+
.setCacheMode(CacheMode.PARTITIONED)
101+
.setBackups(1)
102+
.setNodeFilter(n -> n.dataCenterId().equals(DC1));
103+
104+
// Replicated cache, in one data center.
105+
CacheConfiguration<Integer, Employer> ccfgRepl1dc = cacheConfiguration("repl1dc")
106+
.setCacheMode(CacheMode.REPLICATED)
107+
.setNodeFilter(n -> n.dataCenterId().equals(DC2));
108+
109+
cfg.setCacheConfiguration(ccfgPart2dc, ccfgRepl2dc, ccfgPart1dc, ccfgRepl1dc);
110+
111+
return cfg;
112+
}
113+
114+
/** */
115+
private CacheConfiguration<Integer, Employer> cacheConfiguration(String name) {
116+
return new CacheConfiguration<Integer, Employer>()
117+
.setName(name)
118+
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
119+
.setSqlSchema(QueryUtils.DFLT_SCHEMA)
120+
.setQueryEntities(Collections.singletonList(new QueryEntity(Integer.class, Employer.class)
121+
.setTableName(name)
122+
.addQueryField("ID", Integer.class.getName(), "ID")
123+
.setKeyFieldName("ID")
124+
));
125+
}
126+
127+
/** {@inheritDoc} */
128+
@Override protected void beforeTestsStarted() throws Exception {
129+
// No-op.
130+
}
131+
132+
/** {@inheritDoc} */
133+
@Override protected void afterTest() throws Exception {
134+
// No-op.
135+
}
136+
137+
/** */
138+
@Test
139+
public void testQueryMapping() throws Exception {
140+
dcId = DC1;
141+
142+
IgniteEx srv1dc1 = startGrid(0);
143+
IgniteEx srv2dc1 = startGrid(1);
144+
IgniteEx clientDc1 = startClientGrid(2);
145+
146+
dcId = DC2;
147+
148+
IgniteEx srv1dc2 = startGrid(3);
149+
IgniteEx srv2dc2 = startGrid(4);
150+
IgniteEx clientDc2 = startClientGrid(5);
151+
152+
fillTable(clientDc1, "part2dc");
153+
fillTable(clientDc1, "repl2dc");
154+
fillTable(clientDc1, "part1dc");
155+
fillTable(clientDc1, "repl1dc");
156+
157+
checkQueries(clientDc1);
158+
checkQueries(clientDc2);
159+
checkQueries(srv1dc1);
160+
checkQueries(srv1dc2);
161+
checkQueries(srv2dc1);
162+
checkQueries(srv2dc2);
163+
}
164+
165+
/** */
166+
private void fillTable(IgniteEx ignite, String name) {
167+
for (int i = 0; i < ROWS_CNT; i++)
168+
sql(ignite, "INSERT INTO " + name + "(ID, NAME, SALARY) VALUES (?, ?, ?)", i, "name" + i, i);
169+
}
170+
171+
/** */
172+
private void checkQueries(IgniteEx ignite) {
173+
boolean dc1 = DC1.equals(ignite.context().discovery().localNode().dataCenterId());
174+
checkQuery(ignite, "SELECT * FROM part2dc", false);
175+
checkQuery(ignite, "SELECT * FROM repl2dc", false);
176+
checkQuery(ignite, "SELECT * FROM part1dc", !dc1);
177+
checkQuery(ignite, "SELECT * FROM repl1dc", dc1);
178+
checkQuery(ignite, "SELECT * FROM part2dc JOIN repl2dc USING (name)", false);
179+
checkQuery(ignite, "SELECT * FROM part1dc JOIN repl1dc USING (name)", true);
180+
}
181+
182+
/** */
183+
private void checkQuery(IgniteEx ignite, String sql, boolean expCrossDc) {
184+
crossDcRequest.set(false);
185+
186+
List<?> res = sql(ignite, sql);
187+
188+
assertEquals(ROWS_CNT, res.size());
189+
190+
assertEquals(expCrossDc, crossDcRequest.get());
191+
}
192+
}

modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.ignite.internal.processors.query.calcite.integration.LocalQueryIntegrationTest;
5656
import org.apache.ignite.internal.processors.query.calcite.integration.MemoryQuotasIntegrationTest;
5757
import org.apache.ignite.internal.processors.query.calcite.integration.MetadataIntegrationTest;
58+
import org.apache.ignite.internal.processors.query.calcite.integration.MultiDcQueryMappingTest;
5859
import org.apache.ignite.internal.processors.query.calcite.integration.OperatorsExtensionIntegrationTest;
5960
import org.apache.ignite.internal.processors.query.calcite.integration.PartitionPruneTest;
6061
import org.apache.ignite.internal.processors.query.calcite.integration.PartitionsReservationIntegrationTest;
@@ -173,6 +174,7 @@
173174
KeyClassChangeIntegrationTest.class,
174175
QueryEntityValueColumnAliasTest.class,
175176
CacheStoreTest.class,
177+
MultiDcQueryMappingTest.class,
176178
})
177179
public class IntegrationTestSuite {
178180
}

0 commit comments

Comments
 (0)