Skip to content

Commit 13f84fd

Browse files
authored
Fix empty tsfile and resource generated when insert, load, kill -9 and restart (apache#16215)
* Fix empty tsfile and resource generated when insert, load, kill -9 and restart * Fix more * Add IT
1 parent 75bdbb4 commit 13f84fd

File tree

2 files changed

+79
-17
lines changed

2 files changed

+79
-17
lines changed

integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestartIT.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,13 @@
2424
import org.apache.iotdb.db.it.utils.TestUtils;
2525
import org.apache.iotdb.it.env.EnvFactory;
2626
import org.apache.iotdb.it.framework.IoTDBTestRunner;
27+
import org.apache.iotdb.it.utils.TsFileGenerator;
2728
import org.apache.iotdb.itbase.category.ClusterIT;
2829
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
2930

31+
import org.apache.commons.io.FileUtils;
32+
import org.apache.tsfile.enums.TSDataType;
33+
import org.apache.tsfile.write.schema.MeasurementSchema;
3034
import org.junit.After;
3135
import org.junit.Before;
3236
import org.junit.Test;
@@ -35,10 +39,13 @@
3539
import org.slf4j.Logger;
3640
import org.slf4j.LoggerFactory;
3741

42+
import java.io.File;
43+
import java.nio.file.Files;
3844
import java.sql.Connection;
3945
import java.sql.ResultSet;
4046
import java.sql.SQLException;
4147
import java.sql.Statement;
48+
import java.util.Collections;
4249

4350
import static org.apache.iotdb.db.utils.constant.TestConstant.TIMESTAMP_STR;
4451
import static org.junit.Assert.assertEquals;
@@ -371,4 +378,56 @@ public void testRecoverFromFlushMemTableError() throws Exception {
371378
}
372379
}
373380
}
381+
382+
@Test
383+
public void testInsertLoadAndRecover() throws Exception {
384+
try (Connection connection = EnvFactory.getEnv().getConnection();
385+
Statement statement = connection.createStatement()) {
386+
statement.execute("create timeseries root.sg.d1.s1 with datatype=int32");
387+
statement.execute("insert into root.sg.d1(time,s1) values(2,2)");
388+
statement.execute("flush");
389+
}
390+
File tmpDir = new File(Files.createTempDirectory("load").toUri());
391+
File tsfile = new File(tmpDir, "0-0-0-0.tsfile");
392+
try {
393+
try (final TsFileGenerator generator = new TsFileGenerator(tsfile)) {
394+
generator.registerTimeseries(
395+
"root.sg.d1", Collections.singletonList(new MeasurementSchema("s1", TSDataType.INT32)));
396+
generator.generateData("root.sg.d1", 1, 2, false);
397+
}
398+
try (Connection connection = EnvFactory.getEnv().getConnection();
399+
Statement statement = connection.createStatement()) {
400+
statement.execute("insert into root.sg.d1(time,s1) values(1,1)");
401+
statement.execute(String.format("load \"%s\" ", tsfile.getAbsolutePath()));
402+
try (ResultSet resultSet = statement.executeQuery("select s1 from root.sg.d1")) {
403+
assertNotNull(resultSet);
404+
int cnt = 0;
405+
while (resultSet.next()) {
406+
assertEquals(String.valueOf(cnt + 1), resultSet.getString(1));
407+
cnt++;
408+
}
409+
assertEquals(2, cnt);
410+
}
411+
}
412+
413+
// restart dn
414+
TestUtils.stopForciblyAndRestartDataNodes();
415+
416+
try (Connection connection = EnvFactory.getEnv().getConnection();
417+
Statement statement = connection.createStatement()) {
418+
try (ResultSet resultSet = statement.executeQuery("select s1 from root.sg.d1")) {
419+
assertNotNull(resultSet);
420+
int cnt = 0;
421+
while (resultSet.next()) {
422+
assertEquals(String.valueOf(cnt + 1), resultSet.getString(1));
423+
cnt++;
424+
}
425+
assertEquals(2, cnt);
426+
}
427+
}
428+
429+
} finally {
430+
FileUtils.deleteDirectory(tmpDir);
431+
}
432+
}
374433
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -581,9 +581,10 @@ private void recover() throws DataRegionException {
581581
}
582582
}
583583
}
584-
for (List<TsFileResource> value : partitionTmpUnseqTsFiles.values()) {
584+
for (List<TsFileResource> unseqTsFiles : partitionTmpUnseqTsFiles.values()) {
585+
List<TsFileResource> unsealedTsFiles = new ArrayList<>();
585586
// tsFiles without resource file are unsealed
586-
for (TsFileResource resource : value) {
587+
for (TsFileResource resource : unseqTsFiles) {
587588
if (resource.resourceFileExists()) {
588589
FileMetrics.getInstance()
589590
.addTsFile(
@@ -592,6 +593,13 @@ private void recover() throws DataRegionException {
592593
resource.getTsFile().length(),
593594
false,
594595
resource.getTsFile().getName());
596+
} else {
597+
WALRecoverListener recoverListener =
598+
recoverUnsealedTsFile(resource, dataRegionRecoveryContext, false);
599+
if (recoverListener != null) {
600+
recoverListeners.add(recoverListener);
601+
}
602+
unsealedTsFiles.add(resource);
595603
}
596604
if (ModificationFile.getExclusiveMods(resource.getTsFile()).exists()) {
597605
// update mods file metrics
@@ -600,19 +608,7 @@ private void recover() throws DataRegionException {
600608
resource.upgradeModFile(upgradeModFileThreadPool);
601609
}
602610
}
603-
while (!value.isEmpty()) {
604-
TsFileResource tsFileResource = value.get(value.size() - 1);
605-
if (tsFileResource.resourceFileExists()) {
606-
break;
607-
} else {
608-
value.remove(value.size() - 1);
609-
WALRecoverListener recoverListener =
610-
recoverUnsealedTsFile(tsFileResource, dataRegionRecoveryContext, false);
611-
if (recoverListener != null) {
612-
recoverListeners.add(recoverListener);
613-
}
614-
}
615-
}
611+
unseqTsFiles.removeAll(unsealedTsFiles);
616612
}
617613
// signal wal recover manager to recover this region's files
618614
WALRecoverManager.getInstance().getAllDataRegionScannedLatch().countDown();
@@ -964,7 +960,12 @@ private void recoverSealedTsFiles(
964960
new SealedTsFileRecoverPerformer(sealedTsFile)) {
965961
recoverPerformer.recover();
966962
sealedTsFile.close();
967-
tsFileResourceManager.registerSealedTsFileResource(sealedTsFile);
963+
if (!TsFileValidator.getInstance().validateTsFile(sealedTsFile)) {
964+
sealedTsFile.remove();
965+
tsFileManager.remove(sealedTsFile, sealedTsFile.isSeq());
966+
} else {
967+
tsFileResourceManager.registerSealedTsFileResource(sealedTsFile);
968+
}
968969
} catch (Throwable e) {
969970
logger.error("Fail to recover sealed TsFile {}, skip it.", sealedTsFile.getTsFilePath(), e);
970971
} finally {
@@ -1075,7 +1076,9 @@ private void syncRecoverFilesInPartition(
10751076
lastFlushTimeMap.getMemSize(partitionId)));
10761077
}
10771078
for (TsFileResource tsFileResource : resourceList) {
1078-
updateDeviceLastFlushTime(tsFileResource);
1079+
if (!tsFileResource.isDeleted()) {
1080+
updateDeviceLastFlushTime(tsFileResource);
1081+
}
10791082
}
10801083
TimePartitionManager.getInstance()
10811084
.updateAfterFlushing(

0 commit comments

Comments
 (0)