|
25 | 25 | import org.apache.iotdb.commons.path.PartialPath; |
26 | 26 | import org.apache.iotdb.db.conf.IoTDBDescriptor; |
27 | 27 | import org.apache.iotdb.db.exception.StorageEngineException; |
| 28 | +import org.apache.iotdb.db.service.metrics.FileMetrics; |
28 | 29 | import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; |
29 | 30 | import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer; |
30 | 31 | import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask; |
31 | 32 | import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger; |
32 | 33 | import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter; |
| 34 | +import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; |
33 | 35 | import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry; |
34 | 36 | import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; |
35 | 37 | import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; |
|
38 | 40 | import org.apache.tsfile.common.conf.TSFileDescriptor; |
39 | 41 | import org.apache.tsfile.common.constant.TsFileConstant; |
40 | 42 | import org.apache.tsfile.enums.TSDataType; |
| 43 | +import org.apache.tsfile.exception.write.PageException; |
41 | 44 | import org.apache.tsfile.exception.write.WriteProcessException; |
42 | 45 | import org.apache.tsfile.file.metadata.IDeviceID; |
43 | 46 | import org.apache.tsfile.file.metadata.enums.CompressionType; |
|
65 | 68 | import java.util.List; |
66 | 69 | import java.util.Map; |
67 | 70 | import java.util.Objects; |
| 71 | +import java.util.concurrent.CountDownLatch; |
68 | 72 |
|
69 | 73 | import static org.apache.iotdb.db.storageengine.dataregion.compaction.utils.TsFileGeneratorUtils.createChunkWriter; |
70 | 74 | import static org.apache.iotdb.db.storageengine.dataregion.compaction.utils.TsFileGeneratorUtils.createCompressionType; |
@@ -843,6 +847,72 @@ public void testCompactionWithAllEmptyValueChunks() throws IOException, IllegalP |
843 | 847 | validateTargetDatas(sourceDatas, tsDataTypes); |
844 | 848 | } |
845 | 849 |
|
| 850 | + @Test |
| 851 | + public void testCascadedDeletionDuringCompaction() throws IOException, InterruptedException { |
| 852 | + TsFileResource source = createEmptyFileAndResource(true); |
| 853 | + try (CompactionTestFileWriter writer = new CompactionTestFileWriter(source)) { |
| 854 | + writer.startChunkGroup("d1"); |
| 855 | + writer.generateSimpleAlignedSeriesToCurrentDevice( |
| 856 | + Arrays.asList("s1"), |
| 857 | + new TimeRange[] {new TimeRange(10, 20)}, |
| 858 | + TSEncoding.PLAIN, |
| 859 | + CompressionType.LZ4); |
| 860 | + writer.endChunkGroup(); |
| 861 | + writer.endFile(); |
| 862 | + } |
| 863 | + CountDownLatch latch1 = new CountDownLatch(1); |
| 864 | + CountDownLatch latch2 = new CountDownLatch(1); |
| 865 | + InnerSpaceCompactionTask task = |
| 866 | + new InnerSpaceCompactionTask( |
| 867 | + 0, |
| 868 | + tsFileManager, |
| 869 | + Collections.singletonList(source), |
| 870 | + true, |
| 871 | + new TestReadChunkCompactionPerformer(latch1, latch2), |
| 872 | + 0); |
| 873 | + new Thread( |
| 874 | + () -> { |
| 875 | + try { |
| 876 | + latch1.await(); |
| 877 | + try (ModificationFile modificationFile = source.getModFileForWrite()) { |
| 878 | + modificationFile.write( |
| 879 | + new TreeDeletionEntry(new MeasurementPath("root.testsg.d1.s1"), 15)); |
| 880 | + } |
| 881 | + latch2.countDown(); |
| 882 | + } catch (Exception e) { |
| 883 | + throw new RuntimeException(e); |
| 884 | + } |
| 885 | + }) |
| 886 | + .start(); |
| 887 | + Assert.assertTrue(task.start()); |
| 888 | + Assert.assertEquals(1, tsFileManager.getTsFileList(true).size()); |
| 889 | + tsFileManager.getTsFileList(true).get(0).getExclusiveModFile(); |
| 890 | + Assert.assertEquals(1, FileMetrics.getInstance().getModFileNum()); |
| 891 | + } |
| 892 | + |
| 893 | + private static class TestReadChunkCompactionPerformer extends ReadChunkCompactionPerformer { |
| 894 | + |
| 895 | + private final CountDownLatch latch1; |
| 896 | + private final CountDownLatch latch2; |
| 897 | + |
| 898 | + public TestReadChunkCompactionPerformer(CountDownLatch latch1, CountDownLatch latch2) { |
| 899 | + this.latch1 = latch1; |
| 900 | + this.latch2 = latch2; |
| 901 | + } |
| 902 | + |
| 903 | + @Override |
| 904 | + public void perform() |
| 905 | + throws IOException, |
| 906 | + MetadataException, |
| 907 | + InterruptedException, |
| 908 | + StorageEngineException, |
| 909 | + PageException { |
| 910 | + super.perform(); |
| 911 | + latch1.countDown(); |
| 912 | + latch2.await(); |
| 913 | + } |
| 914 | + } |
| 915 | + |
846 | 916 | private void writeEmptyAlignedChunk( |
847 | 917 | AlignedChunkWriterImpl alignedChunkWriter, |
848 | 918 | TsFileIOWriter tsFileIOWriter, |
|
0 commit comments