Skip to content

Commit 098ddec

Browse files
IGNITE-26602 Snapshot component optimizations for MultiDC (#12562)
1 parent afff124 commit 098ddec

File tree

3 files changed

+184
-4
lines changed

3 files changed

+184
-4
lines changed

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1577,7 +1577,7 @@ private void finishIncrementalSnapshotRestore(UUID reqId, Map<UUID, Boolean> res
15771577
* @param metas Map of snapshot metadata distribution across the cluster.
15781578
* @return Map of cache partitions per each node.
15791579
*/
1580-
private static Map<UUID, Map<Integer, Set<Integer>>> snapshotAffinity(
1580+
private Map<UUID, Map<Integer, Set<Integer>>> snapshotAffinity(
15811581
Map<UUID, List<SnapshotMetadata>> metas,
15821582
BiPredicate<Integer, Integer> filter
15831583
) {
@@ -1586,10 +1586,23 @@ private static Map<UUID, Map<Integer, Set<Integer>>> snapshotAffinity(
15861586
List<UUID> nodes = new ArrayList<>(metas.keySet());
15871587
Collections.shuffle(nodes);
15881588

1589-
Map<UUID, List<SnapshotMetadata>> shuffleMetas = new LinkedHashMap<>();
1590-
nodes.forEach(k -> shuffleMetas.put(k, metas.get(k)));
1589+
Map<UUID, List<SnapshotMetadata>> orderedMetas = new LinkedHashMap<>();
15911590

1592-
for (Map.Entry<UUID, List<SnapshotMetadata>> e : shuffleMetas.entrySet()) {
1591+
String locDc = ctx.discovery().localNode().dataCenterId();
1592+
1593+
if (locDc != null) {
1594+
List<UUID> sameDcNodes = nodes.stream()
1595+
.map(uuid -> ctx.discovery().node(uuid))
1596+
.filter(node -> Objects.equals(node.dataCenterId(), locDc))
1597+
.map(ClusterNode::id)
1598+
.collect(Collectors.toList());
1599+
1600+
sameDcNodes.forEach(k -> orderedMetas.put(k, metas.get(k))); // Getting same DC files first.
1601+
}
1602+
1603+
nodes.forEach(k -> orderedMetas.put(k, metas.get(k)));
1604+
1605+
for (Map.Entry<UUID, List<SnapshotMetadata>> e : orderedMetas.entrySet()) {
15931606
UUID nodeId = e.getKey();
15941607

15951608
for (SnapshotMetadata meta : ofNullable(e.getValue()).orElse(Collections.emptyList())) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
19+
20+
import java.io.IOException;
21+
import java.nio.file.DirectoryStream;
22+
import java.nio.file.Files;
23+
import java.nio.file.Path;
24+
import java.nio.file.Paths;
25+
import java.util.Collections;
26+
import java.util.concurrent.atomic.AtomicInteger;
27+
import org.apache.ignite.Ignite;
28+
import org.apache.ignite.IgniteCheckedException;
29+
import org.apache.ignite.IgniteDataStreamer;
30+
import org.apache.ignite.IgniteException;
31+
import org.apache.ignite.IgniteSystemProperties;
32+
import org.apache.ignite.cache.CacheMode;
33+
import org.apache.ignite.cluster.ClusterState;
34+
import org.apache.ignite.configuration.CacheConfiguration;
35+
import org.apache.ignite.configuration.IgniteConfiguration;
36+
import org.apache.ignite.internal.IgniteEx;
37+
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
38+
import org.apache.ignite.internal.util.typedef.F;
39+
import org.apache.ignite.internal.util.typedef.internal.U;
40+
import org.junit.Before;
41+
import org.junit.Test;
42+
43+
import static org.apache.ignite.internal.util.IgniteUtils.defaultWorkDirectory;
44+
45+
/** */
46+
public class IgniteSnapshotRestoreFromRemoteMdcTest extends AbstractSnapshotSelfTest {
47+
/** Cache. */
48+
private static final String CACHE = "cache";
49+
50+
/** */
51+
private static final String DC_ID_0 = "DC_ID_0";
52+
53+
/** */
54+
private static final String DC_ID_1 = "DC_ID_1";
55+
56+
/** @throws Exception If fails. */
57+
@Before
58+
public void before() throws Exception {
59+
cleanPersistenceDir();
60+
cleanupDedicatedPersistenceDirs();
61+
}
62+
63+
/** {@inheritDoc} */
64+
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
65+
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
66+
67+
cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
68+
69+
return cfg;
70+
}
71+
72+
/** @throws Exception If failed. */
73+
@Test
74+
public void testMdcAwareSnapshotFromCurrentDc() throws Exception {
75+
testMdcAwareSnapshot(true);
76+
}
77+
78+
/** @throws Exception If failed. */
79+
@Test
80+
public void testMdcAwareSnapshotFromBothDc() throws Exception {
81+
testMdcAwareSnapshot(false);
82+
}
83+
84+
/** @throws Exception If failed. */
85+
private void testMdcAwareSnapshot(boolean replicatedCache) throws Exception {
86+
Ignite supplier = startGridWithCustomWorkdir("supplier", DC_ID_0);
87+
88+
IgniteEx other = startGridWithCustomWorkdir("other_dc_node", DC_ID_1);
89+
90+
other.cluster().state(ClusterState.ACTIVE);
91+
92+
fillCache(other, replicatedCache);
93+
94+
snp(other).createSnapshot(SNAPSHOT_NAME, null, false, false).get(TIMEOUT);
95+
96+
other.cache(CACHE).destroy();
97+
98+
awaitPartitionMapExchange();
99+
100+
Ignite demander = startGridWithCustomWorkdir("demander", DC_ID_0);
101+
102+
resetBaselineTopology();
103+
104+
TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(demander);
105+
106+
AtomicInteger supReqCnt = new AtomicInteger();
107+
AtomicInteger otherReqCnt = new AtomicInteger();
108+
109+
spi.blockMessages((node, message) -> {
110+
if (message instanceof SnapshotFilesRequestMessage) {
111+
if (node.id().equals(supplier.cluster().localNode().id()))
112+
supReqCnt.incrementAndGet();
113+
else if (node.id().equals(other.cluster().localNode().id()))
114+
otherReqCnt.incrementAndGet();
115+
}
116+
117+
return false;
118+
});
119+
120+
other.snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(CACHE)).get(60_000);
121+
122+
assertTrue(supReqCnt.get() > 0);
123+
assertEquals(!replicatedCache, otherReqCnt.get() > 0);
124+
125+
assertCacheKeys(other.cache(CACHE), CACHE_KEYS_RANGE);
126+
}
127+
128+
/** */
129+
private void fillCache(IgniteEx ignite, boolean replicatedCache) {
130+
CacheConfiguration<Integer, Object> ccfg = new CacheConfiguration<>();
131+
132+
ccfg.setName(CACHE);
133+
134+
if (replicatedCache)
135+
ccfg.setCacheMode(CacheMode.REPLICATED); // Fill all nodes with partitions.
136+
137+
ignite.createCache(ccfg);
138+
139+
try (IgniteDataStreamer<Integer, Object> ds = ignite.dataStreamer(ccfg.getName())) {
140+
for (int i = 0; i < CACHE_KEYS_RANGE; i++)
141+
ds.addData(i, valueBuilder().apply(i));
142+
}
143+
}
144+
145+
/** */
146+
private IgniteEx startGridWithCustomWorkdir(String instanceName, String dcId) throws Exception {
147+
IgniteConfiguration cfg = getConfiguration(instanceName)
148+
.setUserAttributes(F.asMap(IgniteSystemProperties.IGNITE_DATA_CENTER_ID, dcId));
149+
150+
cfg.setWorkDirectory(Paths.get(defaultWorkDirectory(), U.maskForFileName(instanceName)).toString());
151+
152+
return startGrid(cfg);
153+
}
154+
155+
/** */
156+
protected static void cleanupDedicatedPersistenceDirs() {
157+
try (DirectoryStream<Path> ds = Files.newDirectoryStream(Path.of(defaultWorkDirectory()))) {
158+
for (Path dir : ds)
159+
U.delete(dir);
160+
}
161+
catch (IOException | IgniteCheckedException e) {
162+
throw new IgniteException(e);
163+
}
164+
}
165+
}

modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotMXBeanTest;
2828
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManagerSelfTest;
2929
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotRemoteRequestTest;
30+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotRestoreFromRemoteMdcTest;
3031
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotRestoreFromRemoteTest;
3132
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotRollingUpgradeTest;
3233
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotWithMetastorageTest;
@@ -51,6 +52,7 @@ public static void addSnapshotTests(List<Class<?>> suite, Collection<Class> igno
5152
GridTestUtils.addTestIfNeeded(suite, IgniteSnapshotManagerSelfTest.class, ignoredTests);
5253
GridTestUtils.addTestIfNeeded(suite, IgniteClusterSnapshotSelfTest.class, ignoredTests);
5354
GridTestUtils.addTestIfNeeded(suite, IgniteSnapshotRemoteRequestTest.class, ignoredTests);
55+
GridTestUtils.addTestIfNeeded(suite, IgniteSnapshotRestoreFromRemoteMdcTest.class, ignoredTests);
5456
GridTestUtils.addTestIfNeeded(suite, IgniteClusterSnapshotCheckTest.class, ignoredTests);
5557
GridTestUtils.addTestIfNeeded(suite, IgniteSnapshotWithMetastorageTest.class, ignoredTests);
5658
GridTestUtils.addTestIfNeeded(suite, IgniteSnapshotMXBeanTest.class, ignoredTests);

0 commit comments

Comments
 (0)