Skip to content

Commit f39cf38

Browse files
authored
Ignore data of deprecated table in compaction (apache#16543)
* ignore data of deprecated table in compaction * fix ut
1 parent c4fc2e9 commit f39cf38

File tree

9 files changed

+216
-96
lines changed

9 files changed

+216
-96
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,8 @@ public void perform() throws Exception {
125125
? new FastCrossCompactionWriter(targetFiles, seqFiles, readerCacheMap)
126126
: new FastInnerCompactionWriter(targetFiles)) {
127127
List<Schema> schemas =
128-
CompactionTableSchemaCollector.collectSchema(seqFiles, unseqFiles, readerCacheMap);
128+
CompactionTableSchemaCollector.collectSchema(
129+
seqFiles, unseqFiles, readerCacheMap, deviceIterator.getDeprecatedTableSchemaMap());
129130
compactionWriter.setSchemaForAllTargetFile(schemas);
130131
readModification(seqFiles);
131132
readModification(unseqFiles);
@@ -210,6 +211,10 @@ private void compactAlignedSeries(
210211
measurementSchemas.add(entry.getValue().left);
211212
timeseriesMetadataOffsetMap.put(entry.getKey(), entry.getValue().right);
212213
}
214+
// current device may be ignored by some conditions
215+
if (measurementSchemas.isEmpty()) {
216+
return;
217+
}
213218

214219
FastCompactionTaskSummary taskSummary = new FastCompactionTaskSummary();
215220
new FastCompactionPerformerSubTask(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,10 @@ public void perform()
9191
PageException {
9292
try (MultiTsFileDeviceIterator deviceIterator = new MultiTsFileDeviceIterator(seqFiles)) {
9393
schema =
94-
CompactionTableSchemaCollector.collectSchema(seqFiles, deviceIterator.getReaderMap());
94+
CompactionTableSchemaCollector.collectSchema(
95+
seqFiles,
96+
deviceIterator.getReaderMap(),
97+
deviceIterator.getDeprecatedTableSchemaMap());
9598
while (deviceIterator.hasNextDevice()) {
9699
currentWriter = getAvailableCompactionWriter();
97100
Pair<IDeviceID, Boolean> deviceInfo = deviceIterator.nextDevice();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,10 @@ public void perform() throws Exception {
118118
new MultiTsFileDeviceIterator(seqFiles, unseqFiles);
119119
List<Schema> schemas =
120120
CompactionTableSchemaCollector.collectSchema(
121-
seqFiles, unseqFiles, deviceIterator.getReaderMap());
121+
seqFiles,
122+
unseqFiles,
123+
deviceIterator.getReaderMap(),
124+
deviceIterator.getDeprecatedTableSchemaMap());
122125
compactionWriter.setSchemaForAllTargetFile(schemas);
123126
while (deviceIterator.hasNextDevice()) {
124127
checkThreadInterrupted();
@@ -271,6 +274,7 @@ public static IDataBlockReader constructReader(
271274
} else {
272275
seriesPath = new NonAlignedFullPath(deviceId, measurementSchemas.get(0));
273276
}
277+
274278
return new SeriesDataBlockReader(
275279
seriesPath, new HashSet<>(allSensors), fragmentInstanceContext, queryDataSource, true);
276280
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchema.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ public CompactionTableSchema(String tableName) {
3333
super(tableName);
3434
}
3535

36-
public void merge(TableSchema tableSchema) {
36+
public boolean merge(TableSchema tableSchema) {
3737
if (tableSchema == null) {
38-
return;
38+
return true;
3939
}
4040
if (!tableSchema.getTableName().equals(this.tableName)) {
4141
throw new CompactionTableSchemaNotMatchException(
@@ -60,11 +60,7 @@ public void merge(TableSchema tableSchema) {
6060
IMeasurementSchema idColumnToMerge = idColumnSchemasToMerge.get(i);
6161
IMeasurementSchema currentIdColumn = measurementSchemas.get(i);
6262
if (!idColumnToMerge.getMeasurementName().equals(currentIdColumn.getMeasurementName())) {
63-
throw new CompactionTableSchemaNotMatchException(
64-
"current id column name is "
65-
+ currentIdColumn.getMeasurementName()
66-
+ ", other id column name in same position is "
67-
+ idColumnToMerge.getMeasurementName());
63+
return false;
6864
}
6965
}
7066

@@ -75,6 +71,7 @@ public void merge(TableSchema tableSchema) {
7571
columnCategories.add(ColumnCategory.TAG);
7672
measurementSchemas.add(newIdColumn);
7773
}
74+
return true;
7875
}
7976

8077
public CompactionTableSchema copy() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchemaCollector.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@
2828
import java.io.IOException;
2929
import java.util.ArrayList;
3030
import java.util.HashMap;
31+
import java.util.HashSet;
3132
import java.util.List;
3233
import java.util.Map;
34+
import java.util.Set;
3335
import java.util.stream.Collectors;
3436
import java.util.stream.Stream;
3537

@@ -39,13 +41,17 @@ private CompactionTableSchemaCollector() {}
3941
public static List<Schema> collectSchema(
4042
List<TsFileResource> seqFiles,
4143
List<TsFileResource> unseqFiles,
42-
Map<TsFileResource, TsFileSequenceReader> readerMap)
44+
Map<TsFileResource, TsFileSequenceReader> readerMap,
45+
Map<TsFileResource, Set<String>> deprecatedTableSchemaMap)
4346
throws IOException {
4447
List<Schema> targetSchemas = new ArrayList<>(seqFiles.size());
4548
Schema schema =
4649
collectSchema(
47-
Stream.concat(seqFiles.stream(), unseqFiles.stream()).collect(Collectors.toList()),
48-
readerMap);
50+
Stream.concat(seqFiles.stream(), unseqFiles.stream())
51+
.sorted(TsFileResource::compareFileName)
52+
.collect(Collectors.toList()),
53+
readerMap,
54+
deprecatedTableSchemaMap);
4955

5056
targetSchemas.add(schema);
5157
for (int i = 1; i < seqFiles.size(); i++) {
@@ -64,11 +70,14 @@ public static Schema copySchema(Schema source) {
6470
}
6571

6672
public static Schema collectSchema(
67-
List<TsFileResource> sourceFiles, Map<TsFileResource, TsFileSequenceReader> readerMap)
73+
List<TsFileResource> sourceFiles,
74+
Map<TsFileResource, TsFileSequenceReader> readerMap,
75+
Map<TsFileResource, Set<String>> deprecatedTableSchemaMap)
6876
throws IOException {
6977
Schema targetSchema = new Schema();
7078
Map<String, TableSchema> targetTableSchemaMap = new HashMap<>();
71-
for (TsFileResource resource : sourceFiles) {
79+
for (int i = 0; i < sourceFiles.size(); i++) {
80+
TsFileResource resource = sourceFiles.get(i);
7281
TsFileSequenceReader reader = readerMap.get(resource);
7382
Map<String, TableSchema> tableSchemaMap = reader.getTableSchemaMap();
7483
if (tableSchemaMap == null) {
@@ -89,7 +98,19 @@ public static Schema collectSchema(
8998
collectedTableSchema = new CompactionTableSchema(tableName);
9099
targetTableSchemaMap.put(tableName, collectedTableSchema);
91100
}
92-
collectedTableSchema.merge(currentTableSchema);
101+
boolean canMerge = collectedTableSchema.merge(currentTableSchema);
102+
if (!canMerge) {
103+
// mark resources with deprecated table schema
104+
for (int j = 0; j < i; j++) {
105+
deprecatedTableSchemaMap
106+
.computeIfAbsent(sourceFiles.get(j), k -> new HashSet<>())
107+
.add(tableName);
108+
}
109+
// replace old table schema in targetTableSchemaMap
110+
collectedTableSchema = new CompactionTableSchema(tableName);
111+
collectedTableSchema.merge(currentTableSchema);
112+
targetTableSchemaMap.put(tableName, collectedTableSchema);
113+
}
93114
}
94115
}
95116
targetTableSchemaMap.values().forEach(targetSchema::registerTableSchema);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import java.util.List;
5858
import java.util.Map;
5959
import java.util.Optional;
60+
import java.util.Set;
6061
import java.util.TreeMap;
6162
import java.util.concurrent.ConcurrentHashMap;
6263
import java.util.stream.Collectors;
@@ -69,6 +70,7 @@ public class MultiTsFileDeviceIterator implements AutoCloseable {
6970
// sort from the oldest to the newest by version (Used by ReadChunkPerformer)
7071
private List<TsFileResource> tsFileResourcesSortedByAsc;
7172
private Map<TsFileResource, TsFileSequenceReader> readerMap = new HashMap<>();
73+
private final Map<TsFileResource, Set<String>> deprecatedTableSchemaMap = new HashMap<>();
7274
private final Map<TsFileResource, TsFileDeviceIterator> deviceIteratorMap = new HashMap<>();
7375
private final Map<TsFileResource, PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>>
7476
modificationCache = new HashMap<>();
@@ -344,6 +346,9 @@ public Map<String, MeasurementSchema> getAllSchemasOfCurrentDevice() throws IOEx
344346
// which means this tsfile does not contain the current device, then skip it.
345347
continue;
346348
}
349+
if (isCurrentDeviceDataInDeprecatedTable(resource)) {
350+
continue;
351+
}
347352

348353
TsFileSequenceReader reader = readerMap.get(resource);
349354
for (Map.Entry<String, Pair<List<IChunkMetadata>, Pair<Long, Long>>> entrySet :
@@ -408,6 +413,9 @@ public Map<String, MeasurementSchema> getAllSchemasOfCurrentDevice() throws IOEx
408413
if (!currentDevice.equals(iterator.current())) {
409414
continue;
410415
}
416+
if (isCurrentDeviceDataInDeprecatedTable(tsFileResource)) {
417+
continue;
418+
}
411419
MetadataIndexNode firstMeasurementNodeOfCurrentDevice =
412420
iterator.getFirstMeasurementNodeOfCurrentDevice();
413421
TsFileSequenceReader reader = readerMap.get(tsFileResource);
@@ -479,6 +487,10 @@ public Map<TsFileResource, TsFileSequenceReader> getReaderMap() {
479487
return readerMap;
480488
}
481489

490+
public Map<TsFileResource, Set<String>> getDeprecatedTableSchemaMap() {
491+
return deprecatedTableSchemaMap;
492+
}
493+
482494
@Override
483495
public void close() throws IOException {
484496
for (TsFileSequenceReader reader : readerMap.values()) {
@@ -695,4 +707,15 @@ public String nextSeries() throws IllegalPathException {
695707
return readerAndChunkMetadataForThisSeries;
696708
}
697709
}
710+
711+
// skip data of deleted table
712+
private boolean isCurrentDeviceDataInDeprecatedTable(TsFileResource resource) {
713+
if (ignoreAllNullRows) {
714+
return false;
715+
}
716+
String tableName = currentDevice.getLeft().getTableName();
717+
Set<String> deprecatedTablesInCurrentFile = deprecatedTableSchemaMap.get(resource);
718+
return deprecatedTablesInCurrentFile != null
719+
&& deprecatedTablesInCurrentFile.contains(tableName);
720+
}
698721
}

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/CompactionTableSchemaCollectorTest.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.apache.iotdb.db.storageengine.dataregion.compaction.tablemodel;
2121

22-
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionTableSchemaNotMatchException;
2322
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionTableSchema;
2423

2524
import org.apache.tsfile.enums.ColumnCategory;
@@ -104,11 +103,6 @@ public void test1() {
104103
columnTypeList4.add(ColumnCategory.FIELD);
105104
columnTypeList4.add(ColumnCategory.TAG);
106105
TableSchema tableSchema4 = new TableSchema("t1", measurementSchemaList4, columnTypeList4);
107-
try {
108-
compactionTableSchema.merge(tableSchema4);
109-
} catch (CompactionTableSchemaNotMatchException e) {
110-
return;
111-
}
112-
Assert.fail();
106+
Assert.assertFalse(compactionTableSchema.merge(tableSchema4));
113107
}
114108
}

0 commit comments

Comments
 (0)