Skip to content

Commit a2bbb51

Browse files
authored
Fix that compression ratio is not transferred during region migration (apache#16352)
1 parent 5b08009 commit a2bbb51

File tree

5 files changed

+224
-2
lines changed

5 files changed

+224
-2
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1;
21+
22+
import org.apache.iotdb.consensus.ConsensusFactory;
23+
import org.apache.iotdb.it.env.EnvFactory;
24+
25+
import org.apache.tsfile.utils.Pair;
26+
import org.awaitility.Awaitility;
27+
import org.junit.After;
28+
import org.junit.Assert;
29+
import org.junit.Before;
30+
import org.junit.Test;
31+
32+
import java.sql.Connection;
33+
import java.sql.ResultSet;
34+
import java.sql.Statement;
35+
import java.util.Map;
36+
import java.util.Set;
37+
import java.util.concurrent.TimeUnit;
38+
39+
import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getAllDataNodes;
40+
import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getDataRegionMapWithLeader;
41+
42+
public class IoTDBRegionMigrateWithCompressionRatioIT {
43+
@Before
44+
public void setUp() throws Exception {
45+
EnvFactory.getEnv()
46+
.getConfig()
47+
.getCommonConfig()
48+
.setDataReplicationFactor(2)
49+
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
50+
EnvFactory.getEnv().initClusterEnvironment(1, 3);
51+
}
52+
53+
@After
54+
public void tearDown() throws Exception {
55+
EnvFactory.getEnv().cleanClusterEnvironment();
56+
}
57+
58+
@Test
59+
public void testWithCompressionRatio() throws Exception {
60+
try (Connection connection = EnvFactory.getEnv().getTableConnection();
61+
Statement statement = connection.createStatement()) {
62+
statement.execute("CREATE DATABASE test");
63+
statement.execute("USE test");
64+
statement.execute("CREATE TABLE t1 (s1 INT64 FIELD)");
65+
statement.execute("INSERT INTO t1 (time, s1) VALUES (100, 100)");
66+
statement.execute("FLUSH");
67+
68+
Map<Integer, Pair<Integer, Set<Integer>>> dataRegionMapWithLeader =
69+
getDataRegionMapWithLeader(statement);
70+
int dataRegionIdForTest =
71+
dataRegionMapWithLeader.keySet().stream().max(Integer::compare).get();
72+
73+
Pair<Integer, Set<Integer>> leaderAndNodes = dataRegionMapWithLeader.get(dataRegionIdForTest);
74+
Set<Integer> allDataNodes = getAllDataNodes(statement);
75+
int leaderId = leaderAndNodes.getLeft();
76+
int followerId =
77+
leaderAndNodes.getRight().stream().filter(i -> i != leaderId).findAny().get();
78+
int newLeaderId =
79+
allDataNodes.stream().filter(i -> i != leaderId && i != followerId).findAny().get();
80+
81+
System.out.printf(
82+
"Old leader: %d, follower: %d, new leader: %d%n", leaderId, followerId, newLeaderId);
83+
84+
double[] compressionRatioBeforeMigration = new double[] {Double.NaN};
85+
Awaitility.await()
86+
.atMost(10, TimeUnit.MINUTES)
87+
.pollDelay(1, TimeUnit.SECONDS)
88+
.untilAsserted(
89+
() -> {
90+
try (ResultSet showRegions = statement.executeQuery("SHOW REGIONS")) {
91+
while (showRegions.next()) {
92+
int regionId = showRegions.getInt("RegionId");
93+
int dataNodeId = showRegions.getInt("DataNodeId");
94+
if (regionId == dataRegionIdForTest && dataNodeId == leaderId) {
95+
compressionRatioBeforeMigration[0] =
96+
showRegions.getDouble("CompressionRatio");
97+
break;
98+
}
99+
}
100+
}
101+
Assert.assertFalse(Double.isNaN(compressionRatioBeforeMigration[0]));
102+
});
103+
104+
statement.execute(
105+
String.format(
106+
"migrate region %d from %d to %d", dataRegionIdForTest, leaderId, newLeaderId));
107+
108+
double finalCompressionRatioBeforeMigration = compressionRatioBeforeMigration[0];
109+
Awaitility.await()
110+
.atMost(10, TimeUnit.MINUTES)
111+
.pollDelay(1, TimeUnit.SECONDS)
112+
.untilAsserted(
113+
() -> {
114+
double compressionRatioAfterMigration = 0.0;
115+
try (ResultSet showRegions = statement.executeQuery("SHOW REGIONS")) {
116+
while (showRegions.next()) {
117+
int regionId = showRegions.getInt("RegionId");
118+
int dataNodeId = showRegions.getInt("DataNodeId");
119+
if (regionId == dataRegionIdForTest && dataNodeId == newLeaderId) {
120+
compressionRatioAfterMigration = showRegions.getDouble("CompressionRatio");
121+
break;
122+
}
123+
}
124+
}
125+
Assert.assertEquals(
126+
finalCompressionRatioBeforeMigration, compressionRatioAfterMigration, 0.0001);
127+
});
128+
}
129+
}
130+
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/CompressionRatio.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ public static void deleteRedundantFilesByIndex(File[] files, int index) throws I
316316
}
317317

318318
@TestOnly
319-
void reset() throws IOException {
319+
public void reset() throws IOException {
320320
if (!directory.exists()) {
321321
return;
322322
}
@@ -328,9 +328,26 @@ void reset() throws IOException {
328328
Files.delete(file.toPath());
329329
}
330330
totalMemorySize = new AtomicLong(0);
331+
dataRegionRatioMap.clear();
331332
totalDiskSize = 0L;
332333
}
333334

335+
public synchronized File getCompressionRatioFile(String dataRegionId) {
336+
Pair<Long, Long> dataRegionCompressionRatio = dataRegionRatioMap.get(dataRegionId);
337+
if (dataRegionCompressionRatio == null) {
338+
return null;
339+
}
340+
return SystemFileFactory.INSTANCE.getFile(
341+
directory,
342+
String.format(
343+
Locale.ENGLISH,
344+
RATIO_FILE_PATH_FORMAT,
345+
dataRegionCompressionRatio.getLeft(),
346+
dataRegionCompressionRatio.getRight())
347+
+ "."
348+
+ dataRegionId);
349+
}
350+
334351
public Map<String, Pair<Long, Long>> getDataRegionRatioMap() {
335352
return dataRegionRatioMap;
336353
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
2525
import org.apache.iotdb.db.storageengine.StorageEngine;
2626
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
27+
import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
2728
import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
2829
import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
2930

@@ -121,7 +122,9 @@ private DataRegion loadSnapshotWithoutLog() {
121122
return null;
122123
}
123124
LOGGER.info("Moving snapshot file to data dirs");
124-
createLinksFromSnapshotDirToDataDirWithoutLog(new File(snapshotPath));
125+
File snapshotDir = new File(snapshotPath);
126+
createLinksFromSnapshotDirToDataDirWithoutLog(snapshotDir);
127+
loadCompressionRatio(snapshotDir);
125128
return loadSnapshot();
126129
} catch (IOException | DiskSpaceInsufficientException e) {
127130
LOGGER.error(
@@ -130,6 +133,34 @@ private DataRegion loadSnapshotWithoutLog() {
130133
}
131134
}
132135

136+
private void loadCompressionRatio(File snapshotDir) {
137+
File[] compressionFiles =
138+
snapshotDir.listFiles(f -> f.getName().startsWith(CompressionRatio.FILE_PREFIX));
139+
if (compressionFiles == null || compressionFiles.length == 0) {
140+
LOGGER.info("No compression ratio file in dir {}", snapshotPath);
141+
return;
142+
}
143+
File ratioFile = compressionFiles[0];
144+
String fileName = ratioFile.getName();
145+
String ratioPart = fileName.substring(0, fileName.lastIndexOf("."));
146+
String dataRegionId = fileName.substring(fileName.lastIndexOf(".") + 1);
147+
148+
String[] fileNameArray = ratioPart.split("-");
149+
// fileNameArray.length != 3 means the compression ratio may be negative, ignore it
150+
if (fileNameArray.length == 3) {
151+
try {
152+
long rawSize = Long.parseLong(fileNameArray[1]);
153+
long diskSize = Long.parseLong(fileNameArray[2]);
154+
CompressionRatio.getInstance().updateRatio(rawSize, diskSize, dataRegionId);
155+
} catch (NumberFormatException ignore) {
156+
// ignore illegal compression file name
157+
} catch (IOException e) {
158+
LOGGER.warn("Cannot load compression ratio from {}", ratioFile, e);
159+
}
160+
}
161+
LOGGER.info("Loaded compression ratio from {}", ratioFile);
162+
}
163+
133164
private DataRegion loadSnapshotWithLog(File logFile) {
134165
boolean snapshotComplete = false;
135166
try {
@@ -151,6 +182,7 @@ private DataRegion loadSnapshotWithLog(File logFile) {
151182
deleteAllFilesInDataDirs();
152183
LOGGER.info("Remove all data files in original data dir");
153184
createLinksFromSnapshotDirToDataDirWithLog();
185+
loadCompressionRatio(new File(snapshotPath));
154186
return loadSnapshot();
155187
} catch (IOException e) {
156188
LOGGER.error("Failed to remove origin data files", e);
@@ -497,6 +529,14 @@ private List<File> getSnapshotFileWithLog(File logFile) throws IOException {
497529
+ snapshotId;
498530
fileList.addAll(searchDataFilesRecursively(snapshotDir));
499531
}
532+
533+
File[] compressionRatioFiles =
534+
logFile
535+
.getParentFile()
536+
.listFiles(f -> f.getName().startsWith(CompressionRatio.FILE_PREFIX));
537+
if (compressionRatioFiles != null) {
538+
fileList.addAll(Arrays.asList(compressionRatioFiles));
539+
}
500540
return fileList;
501541
} finally {
502542
analyzer.close();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotTaker.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2525
import org.apache.iotdb.db.exception.DirectoryNotLegalException;
2626
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
27+
import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
2728
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
2829
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
2930
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -100,6 +101,7 @@ public boolean takeFullSnapshot(
100101
}
101102
success = createSnapshot(seqFiles, tempSnapshotId);
102103
success = success && createSnapshot(unseqFiles, tempSnapshotId);
104+
success = success && snapshotCompressionRatio(snapshotDirPath);
103105
} finally {
104106
readUnlockTheFile();
105107
}
@@ -136,6 +138,31 @@ public boolean takeFullSnapshot(
136138
}
137139
}
138140

141+
private boolean snapshotCompressionRatio(String snapshotDir) {
142+
File compressionRatioFile =
143+
CompressionRatio.getInstance().getCompressionRatioFile(dataRegion.getDataRegionId());
144+
if (compressionRatioFile != null) {
145+
LOGGER.info("Snapshotting compression ratio {}.", compressionRatioFile.getName());
146+
try {
147+
File snapshotFile = new File(snapshotDir, compressionRatioFile.getName());
148+
if (snapshotFile.createNewFile()) {
149+
// write one byte so that it will not be skipped
150+
Files.write(snapshotFile.toPath(), new byte[1]);
151+
LOGGER.info(
152+
"Snapshot compression ratio {} in {}.", compressionRatioFile.getName(), snapshotDir);
153+
return true;
154+
}
155+
} catch (IOException ignored) {
156+
LOGGER.warn(
157+
"Cannot snapshot compression ratio {} in {}.",
158+
compressionRatioFile.getName(),
159+
snapshotDir);
160+
}
161+
return false;
162+
}
163+
return true;
164+
}
165+
139166
public boolean cleanSnapshot() {
140167
return clearSnapshotOfDataRegion(this.dataRegion);
141168
}

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,15 @@
2525
import org.apache.iotdb.db.exception.DirectoryNotLegalException;
2626
import org.apache.iotdb.db.exception.StorageEngineException;
2727
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
28+
import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
2829
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
2930
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
3031
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
3132
import org.apache.iotdb.db.utils.EnvironmentUtils;
3233

3334
import org.apache.tsfile.exception.write.WriteProcessException;
3435
import org.apache.tsfile.file.metadata.IDeviceID;
36+
import org.apache.tsfile.utils.Pair;
3537
import org.apache.tsfile.utils.TsFileGeneratorUtils;
3638
import org.junit.After;
3739
import org.junit.Assert;
@@ -179,18 +181,24 @@ public void testLoadSnapshot()
179181
try {
180182
List<TsFileResource> resources = writeTsFiles();
181183
DataRegion region = new DataRegion(testSgName, "0");
184+
CompressionRatio.getInstance().updateRatio(100, 100, "0");
182185
region.getTsFileManager().addAll(resources, true);
183186
File snapshotDir = new File("target" + File.separator + "snapshot");
184187
Assert.assertTrue(snapshotDir.exists() || snapshotDir.mkdirs());
185188
try {
186189
Assert.assertTrue(
187190
new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true));
191+
CompressionRatio.getInstance().reset();
192+
188193
DataRegion dataRegion =
189194
new SnapshotLoader(snapshotDir.getAbsolutePath(), testSgName, "0")
190195
.loadSnapshotForStateMachine();
191196
Assert.assertNotNull(dataRegion);
192197
List<TsFileResource> resource = dataRegion.getTsFileManager().getTsFileList(true);
193198
Assert.assertEquals(100, resource.size());
199+
Assert.assertEquals(
200+
new Pair<>(100L, 100L),
201+
CompressionRatio.getInstance().getDataRegionRatioMap().get("0"));
194202
} finally {
195203
FileUtils.recursivelyDeleteFolder(snapshotDir.getAbsolutePath());
196204
}

0 commit comments

Comments
 (0)