Skip to content

Commit b3dc4bd

Browse files
WIP
1 parent 05fa9b6 commit b3dc4bd

File tree

3 files changed

+196
-4
lines changed

3 files changed

+196
-4
lines changed

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1577,7 +1577,7 @@ private void finishIncrementalSnapshotRestore(UUID reqId, Map<UUID, Boolean> res
15771577
* @param metas Map of snapshot metadata distribution across the cluster.
15781578
* @return Map of cache partitions per each node.
15791579
*/
1580-
private static Map<UUID, Map<Integer, Set<Integer>>> snapshotAffinity(
1580+
private Map<UUID, Map<Integer, Set<Integer>>> snapshotAffinity(
15811581
Map<UUID, List<SnapshotMetadata>> metas,
15821582
BiPredicate<Integer, Integer> filter
15831583
) {
@@ -1586,10 +1586,25 @@ private static Map<UUID, Map<Integer, Set<Integer>>> snapshotAffinity(
15861586
List<UUID> nodes = new ArrayList<>(metas.keySet());
15871587
Collections.shuffle(nodes);
15881588

1589-
Map<UUID, List<SnapshotMetadata>> shuffleMetas = new LinkedHashMap<>();
1590-
nodes.forEach(k -> shuffleMetas.put(k, metas.get(k)));
1589+
Map<UUID, List<SnapshotMetadata>> orderedMetas = new LinkedHashMap<>();
15911590

1592-
for (Map.Entry<UUID, List<SnapshotMetadata>> e : shuffleMetas.entrySet()) {
1591+
String locDc = ctx.discovery().node(ctx.localNodeId()).dataCenterId();
1592+
1593+
if (locDc != null) {
1594+
List<UUID> sameDcNodes = nodes.stream()
1595+
.map(uuid -> ctx.discovery().node(uuid))
1596+
.filter(node -> Objects.equals(node.dataCenterId(), locDc))
1597+
.map(ClusterNode::id)
1598+
.collect(Collectors.toList());
1599+
1600+
sameDcNodes.forEach(k -> {
1601+
orderedMetas.put(k, metas.get(k)); // Getting same dc files first
1602+
});
1603+
}
1604+
1605+
nodes.forEach(k -> {orderedMetas.put(k, metas.get(k));});
1606+
1607+
for (Map.Entry<UUID, List<SnapshotMetadata>> e : orderedMetas.entrySet()) {
15931608
UUID nodeId = e.getKey();
15941609

15951610
for (SnapshotMetadata meta : ofNullable(e.getValue()).orElse(Collections.emptyList())) {
@@ -1598,6 +1613,8 @@ private static Map<UUID, Map<Integer, Set<Integer>>> snapshotAffinity(
15981613
for (Map.Entry<Integer, Set<Integer>> metaParts : parts.entrySet()) {
15991614
for (Integer partId : metaParts.getValue()) {
16001615
if (filter.test(metaParts.getKey(), partId)) {
1616+
log.info("Getting partition from remote node [node=" + nodeId + ", part=" + partId + "]");
1617+
16011618
nodeToSnp.computeIfAbsent(nodeId, n -> new HashMap<>())
16021619
.computeIfAbsent(metaParts.getKey(), k -> new HashSet<>())
16031620
.add(partId);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
19+
20+
import java.io.IOException;
21+
import java.nio.file.DirectoryStream;
22+
import java.nio.file.Files;
23+
import java.nio.file.Path;
24+
import java.nio.file.Paths;
25+
import java.util.Collections;
26+
import java.util.function.Function;
27+
import org.apache.ignite.Ignite;
28+
import org.apache.ignite.IgniteCheckedException;
29+
import org.apache.ignite.IgniteDataStreamer;
30+
import org.apache.ignite.IgniteException;
31+
import org.apache.ignite.IgniteSystemProperties;
32+
import org.apache.ignite.cluster.ClusterState;
33+
import org.apache.ignite.configuration.CacheConfiguration;
34+
import org.apache.ignite.configuration.IgniteConfiguration;
35+
import org.apache.ignite.internal.IgniteEx;
36+
import org.apache.ignite.internal.util.typedef.internal.U;
37+
import org.apache.ignite.testframework.ListeningTestLogger;
38+
import org.apache.ignite.testframework.LogListener;
39+
import org.junit.After;
40+
import org.junit.Before;
41+
import org.junit.Test;
42+
43+
import static org.apache.ignite.internal.util.IgniteUtils.defaultWorkDirectory;
44+
45+
/** */
46+
public class IgniteSnapshotRestoreFromRemoteMdcTest extends AbstractSnapshotSelfTest {
47+
/** Cache. */
48+
protected static final String CACHE = "cache";
49+
50+
/** */
51+
private static final String DC_ID_0 = "DC_ID_0";
52+
53+
/** */
54+
private static final String DC_ID_1 = "DC_ID_1";
55+
56+
/** Cache value builder. */
57+
private final Function<Integer, Object> valBuilder = String::valueOf;
58+
59+
/** */
60+
protected ListeningTestLogger listeningLog = new ListeningTestLogger(log);
61+
62+
/** @throws Exception If fails. */
63+
@Before
64+
public void before() throws Exception {
65+
cleanPersistenceDir();
66+
cleanupDedicatedPersistenceDirs();
67+
}
68+
69+
/** @throws Exception If fails. */
70+
@After
71+
public void after() throws Exception {
72+
afterTestSnapshot();
73+
}
74+
75+
/** {@inheritDoc} */
76+
@Override protected void afterTest() throws Exception {
77+
super.afterTest();
78+
79+
System.clearProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID);
80+
}
81+
82+
/** {@inheritDoc} */
83+
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
84+
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
85+
86+
if (listeningLog != null)
87+
cfg.setGridLogger(listeningLog);
88+
89+
return cfg;
90+
}
91+
92+
/** @throws Exception If failed. */
93+
@Test
94+
public void testMdcAwareSnapshot() throws Exception {
95+
System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID, DC_ID_0);
96+
97+
Ignite supplier = startGridWithCustomWorkdir("supplier");
98+
99+
System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID, DC_ID_1);
100+
101+
IgniteEx other = startGridWithCustomWorkdir("other_dc_node");
102+
103+
fillCache(other);
104+
105+
forceCheckpoint();
106+
107+
snp(other).createSnapshot(SNAPSHOT_NAME, null, false, false).get(TIMEOUT);
108+
109+
other.cache(CACHE).destroy();
110+
111+
awaitPartitionMapExchange();
112+
113+
System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID, DC_ID_0);
114+
115+
startGridWithCustomWorkdir("demander");
116+
117+
LogListener supLsnr = LogListener.matches("Getting partition from remote node [node=" +
118+
supplier.cluster().localNode().id()).build();
119+
LogListener otherLsnr = LogListener.matches("Getting partition from remote node [node=" +
120+
other.cluster().localNode().id()).build();
121+
122+
listeningLog.registerListener(supLsnr);
123+
listeningLog.registerListener(otherLsnr);
124+
125+
other.snapshot().restoreSnapshot(SNAPSHOT_NAME, Collections.singleton(CACHE)).get(TIMEOUT);
126+
127+
assertTrue(supLsnr.check());
128+
assertFalse(otherLsnr.check());
129+
130+
assertCacheKeys(other.cache(CACHE), CACHE_KEYS_RANGE);
131+
}
132+
133+
/** */
134+
private void fillCache(IgniteEx ignite) {
135+
CacheConfiguration<Integer, Object> ccfg = new CacheConfiguration<>();
136+
137+
ccfg.setName(CACHE);
138+
ccfg.setBackups(2); // To fill both nodes
139+
140+
ignite.createCache(ccfg);
141+
142+
try (IgniteDataStreamer<Integer, Object> ds = ignite.dataStreamer(ccfg.getName())) {
143+
for (int i = 0; i < CACHE_KEYS_RANGE; i++)
144+
ds.addData(i, valBuilder.apply(i));
145+
}
146+
}
147+
148+
/** */
149+
private IgniteEx startGridWithCustomWorkdir(String instanceName) throws Exception {
150+
IgniteConfiguration cfg = getConfiguration(instanceName);
151+
152+
cfg.setWorkDirectory(Paths.get(defaultWorkDirectory(), U.maskForFileName(instanceName)).toString());
153+
154+
IgniteEx ignite = startGrid(cfg);
155+
156+
ignite.cluster().state(ClusterState.ACTIVE);
157+
resetBaselineTopology();
158+
awaitPartitionMapExchange();
159+
160+
return ignite;
161+
}
162+
163+
/** */
164+
protected static void cleanupDedicatedPersistenceDirs() {
165+
try (DirectoryStream<Path> ds = Files.newDirectoryStream(Path.of(defaultWorkDirectory()))) {
166+
for (Path dir : ds)
167+
U.delete(dir);
168+
}
169+
catch (IOException | IgniteCheckedException e) {
170+
throw new IgniteException(e);
171+
}
172+
}
173+
}

modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotMXBeanTest;
2828
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManagerSelfTest;
2929
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotRemoteRequestTest;
30+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotRestoreFromRemoteMdcTest;
3031
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotRestoreFromRemoteTest;
3132
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotRollingUpgradeTest;
3233
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotWithMetastorageTest;
@@ -51,6 +52,7 @@ public static void addSnapshotTests(List<Class<?>> suite, Collection<Class> igno
5152
GridTestUtils.addTestIfNeeded(suite, IgniteSnapshotManagerSelfTest.class, ignoredTests);
5253
GridTestUtils.addTestIfNeeded(suite, IgniteClusterSnapshotSelfTest.class, ignoredTests);
5354
GridTestUtils.addTestIfNeeded(suite, IgniteSnapshotRemoteRequestTest.class, ignoredTests);
55+
GridTestUtils.addTestIfNeeded(suite, IgniteSnapshotRestoreFromRemoteMdcTest.class, ignoredTests);
5456
GridTestUtils.addTestIfNeeded(suite, IgniteClusterSnapshotCheckTest.class, ignoredTests);
5557
GridTestUtils.addTestIfNeeded(suite, IgniteSnapshotWithMetastorageTest.class, ignoredTests);
5658
GridTestUtils.addTestIfNeeded(suite, IgniteSnapshotMXBeanTest.class, ignoredTests);

0 commit comments

Comments
 (0)