Skip to content

Commit e879c54

Browse files
authored
Reduce duplicated DeviceID in compaction selection (#16314)
* reduce duplicated DeviceID in compaction selection * fix ut
1 parent fb34acb commit e879c54

File tree

9 files changed

+97
-11
lines changed

9 files changed

+97
-11
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,11 @@ public static boolean isDiskHasSpace(double redundancy) {
482482

483483
public static ArrayDeviceTimeIndex buildDeviceTimeIndex(TsFileResource resource)
484484
throws IOException {
485+
return buildDeviceTimeIndex(resource, IDeviceID.Deserializer.DEFAULT_DESERIALIZER);
486+
}
487+
488+
public static ArrayDeviceTimeIndex buildDeviceTimeIndex(
489+
TsFileResource resource, IDeviceID.Deserializer deserializer) throws IOException {
485490
long resourceFileSize =
486491
new File(resource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).length();
487492
CompactionTaskManager.getInstance().getCompactionReadOperationRateLimiter().acquire(1);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,11 @@
3333
import org.apache.iotdb.db.utils.EncryptDBUtils;
3434

3535
import org.apache.tsfile.encrypt.EncryptParameter;
36+
import org.apache.tsfile.file.metadata.IDeviceID;
3637

38+
import java.io.IOException;
39+
import java.io.InputStream;
40+
import java.nio.ByteBuffer;
3741
import java.util.HashMap;
3842
import java.util.HashSet;
3943
import java.util.Map;
@@ -53,6 +57,7 @@ public class CompactionScheduleContext {
5357
// end region
5458

5559
private final Map<TsFileResource, ArrayDeviceTimeIndex> partitionFileDeviceInfoCache;
60+
private final Map<IDeviceID, IDeviceID> deviceIdCache;
5661
private long cachedDeviceInfoSize = 0;
5762

5863
private final Set<Long> timePartitionsDelayInsertionSelection;
@@ -63,12 +68,14 @@ public class CompactionScheduleContext {
6368
public CompactionScheduleContext() {
6469
this.partitionFileDeviceInfoCache = new HashMap<>();
6570
this.timePartitionsDelayInsertionSelection = new HashSet<>();
71+
this.deviceIdCache = new HashMap<>();
6672
this.encryptParameter = EncryptDBUtils.getDefaultFirstEncryptParam();
6773
}
6874

6975
public CompactionScheduleContext(EncryptParameter encryptParameter) {
7076
this.partitionFileDeviceInfoCache = new HashMap<>();
7177
this.timePartitionsDelayInsertionSelection = new HashSet<>();
78+
this.deviceIdCache = new HashMap<>();
7279
this.encryptParameter = encryptParameter;
7380
}
7481

@@ -95,6 +102,7 @@ public ArrayDeviceTimeIndex getResourceDeviceInfo(TsFileResource resource) {
95102

96103
public void clearTimePartitionDeviceInfoCache() {
97104
partitionFileDeviceInfoCache.clear();
105+
deviceIdCache.clear();
98106
CompactionMetrics.getInstance()
99107
.decreaseSelectionCachedDeviceTimeIndexSize(cachedDeviceInfoSize);
100108
cachedDeviceInfoSize = 0;
@@ -209,4 +217,23 @@ public ICrossCompactionPerformer getCrossCompactionPerformer() {
209217
.getCrossCompactionPerformer()
210218
.createInstance(encryptParameter);
211219
}
220+
221+
public IDeviceID.Deserializer getCachedDeviceIdDeserializer() {
222+
return new CachedIDeviceIdDeserializer();
223+
}
224+
225+
private class CachedIDeviceIdDeserializer implements IDeviceID.Deserializer {
226+
227+
@Override
228+
public IDeviceID deserializeFrom(ByteBuffer byteBuffer) {
229+
IDeviceID deviceId = IDeviceID.Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(byteBuffer);
230+
return deviceIdCache.computeIfAbsent(deviceId, k -> deviceId);
231+
}
232+
233+
@Override
234+
public IDeviceID deserializeFrom(InputStream inputStream) throws IOException {
235+
IDeviceID deviceId = IDeviceID.Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(inputStream);
236+
return deviceIdCache.computeIfAbsent(deviceId, k -> deviceId);
237+
}
238+
}
212239
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/utils/TsFileResourceCandidate.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,12 @@ private void prepareDeviceInfos() throws IOException {
100100
deviceTimeIndex = new ArrayDeviceTimeIndex();
101101
return;
102102
}
103-
deviceTimeIndex = CompactionUtils.buildDeviceTimeIndex(resource);
103+
deviceTimeIndex =
104+
CompactionUtils.buildDeviceTimeIndex(
105+
resource,
106+
compactionScheduleContext == null
107+
? IDeviceID.Deserializer.DEFAULT_DESERIALIZER
108+
: compactionScheduleContext.getCachedDeviceIdDeserializer());
104109
} finally {
105110
resource.readUnlock();
106111
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,8 @@ public void deserialize() throws IOException {
320320
try (InputStream inputStream = fsFactory.getBufferedInputStream(file + RESOURCE_SUFFIX)) {
321321
// The first byte is VERSION_NUMBER, second byte is timeIndexType.
322322
ReadWriteIOUtils.readByte(inputStream);
323-
timeIndex = ITimeIndex.createTimeIndex(inputStream);
323+
timeIndex =
324+
ITimeIndex.createTimeIndex(inputStream, IDeviceID.Deserializer.DEFAULT_DESERIALIZER);
324325
maxPlanIndex = ReadWriteIOUtils.readLong(inputStream);
325326
minPlanIndex = ReadWriteIOUtils.readLong(inputStream);
326327

@@ -676,7 +677,8 @@ public Set<IDeviceID> getDevices() {
676677
return timeIndex.getDevices(file.getPath(), this);
677678
}
678679

679-
public ArrayDeviceTimeIndex buildDeviceTimeIndex() throws IOException {
680+
public ArrayDeviceTimeIndex buildDeviceTimeIndex(IDeviceID.Deserializer deserializer)
681+
throws IOException {
680682
readLock();
681683
try {
682684
if (!resourceFileExists()) {
@@ -686,7 +688,8 @@ public ArrayDeviceTimeIndex buildDeviceTimeIndex() throws IOException {
686688
FSFactoryProducer.getFSFactory()
687689
.getBufferedInputStream(file.getPath() + RESOURCE_SUFFIX)) {
688690
ReadWriteIOUtils.readByte(inputStream);
689-
ITimeIndex timeIndexFromResourceFile = ITimeIndex.createTimeIndex(inputStream);
691+
ITimeIndex timeIndexFromResourceFile =
692+
ITimeIndex.createTimeIndex(inputStream, deserializer);
690693
if (!(timeIndexFromResourceFile instanceof ArrayDeviceTimeIndex)) {
691694
throw new IOException("cannot build DeviceTimeIndex from resource " + file.getPath());
692695
}
@@ -700,6 +703,10 @@ public ArrayDeviceTimeIndex buildDeviceTimeIndex() throws IOException {
700703
}
701704
}
702705

706+
public ArrayDeviceTimeIndex buildDeviceTimeIndex() throws IOException {
707+
return buildDeviceTimeIndex(IDeviceID.Deserializer.DEFAULT_DESERIALIZER);
708+
}
709+
703710
/**
704711
* Used for compaction to verify tsfile, also used to verify TimeIndex version when loading tsfile
705712
*/

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,8 @@ public void serialize(OutputStream outputStream) throws IOException {
113113
}
114114

115115
@Override
116-
public ArrayDeviceTimeIndex deserialize(InputStream inputStream) throws IOException {
116+
public ArrayDeviceTimeIndex deserialize(
117+
InputStream inputStream, IDeviceID.Deserializer deserializer) throws IOException {
117118
int deviceNum = ReadWriteIOUtils.readInt(inputStream);
118119

119120
startTimes = new long[deviceNum];
@@ -127,7 +128,7 @@ public ArrayDeviceTimeIndex deserialize(InputStream inputStream) throws IOExcept
127128
}
128129

129130
for (int i = 0; i < deviceNum; i++) {
130-
IDeviceID deviceID = Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(inputStream);
131+
IDeviceID deviceID = deserializer.deserializeFrom(inputStream);
131132
int index = ReadWriteIOUtils.readInt(inputStream);
132133
deviceToIndex.put(deviceID, index);
133134
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ public void serialize(OutputStream outputStream) throws IOException {
7272
}
7373

7474
@Override
75-
public FileTimeIndex deserialize(InputStream inputStream) throws IOException {
75+
public FileTimeIndex deserialize(InputStream inputStream, IDeviceID.Deserializer deserializer)
76+
throws IOException {
7677
throw new UnsupportedOperationException();
7778
}
7879

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ public interface ITimeIndex {
5353
* @param inputStream inputStream
5454
* @return TimeIndex
5555
*/
56-
ITimeIndex deserialize(InputStream inputStream) throws IOException;
56+
ITimeIndex deserialize(InputStream inputStream, IDeviceID.Deserializer deserializer)
57+
throws IOException;
5758

5859
/**
5960
* deserialize from byte buffer
@@ -218,11 +219,14 @@ Pair<Long, Long> getPossibleStartTimeAndEndTime(
218219
*/
219220
byte getTimeIndexType();
220221

221-
static ITimeIndex createTimeIndex(InputStream inputStream) throws IOException {
222+
static ITimeIndex createTimeIndex(InputStream inputStream, IDeviceID.Deserializer deserializer)
223+
throws IOException {
222224
byte timeIndexType = ReadWriteIOUtils.readByte(inputStream);
223225
if (timeIndexType == -1) {
224226
throw new IOException("The end of stream has been reached");
225227
}
226-
return TimeIndexLevel.valueOf(timeIndexType).getTimeIndex().deserialize(inputStream);
228+
return TimeIndexLevel.valueOf(timeIndexType)
229+
.getTimeIndex()
230+
.deserialize(inputStream, deserializer);
227231
}
228232
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PlainDeviceTimeIndex.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ public void serialize(OutputStream outputStream) throws IOException {
4141
}
4242

4343
@Override
44-
public PlainDeviceTimeIndex deserialize(InputStream inputStream) throws IOException {
44+
public PlainDeviceTimeIndex deserialize(
45+
InputStream inputStream, IDeviceID.Deserializer deserializer) throws IOException {
4546
int deviceNum = ReadWriteIOUtils.readInt(inputStream);
4647

4748
startTimes = new long[deviceNum];

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.CrossCompactionPerformer;
3030
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.InnerSeqCompactionPerformer;
3131
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.InnerUnseqCompactionPerformer;
32+
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext;
3233
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduler;
3334
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
3435
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionPriority;
@@ -42,7 +43,10 @@
4243
import org.apache.iotdb.db.utils.EnvironmentUtils;
4344
import org.apache.iotdb.db.utils.constant.TestConstant;
4445

46+
import org.apache.tsfile.file.metadata.IDeviceID;
47+
import org.apache.tsfile.file.metadata.StringArrayDeviceID;
4548
import org.junit.After;
49+
import org.junit.Assert;
4650
import org.junit.Before;
4751
import org.junit.Test;
4852
import org.slf4j.Logger;
@@ -1839,6 +1843,37 @@ public void testLargeFileInLowerLevel() throws Exception {
18391843
}
18401844
}
18411845

1846+
@Test
1847+
public void testManyDuplicatedDevicesInDifferentResources() throws IOException {
1848+
String sgName = COMPACTION_TEST_SG + "test18";
1849+
TsFileResource resource1 = CompactionFileGeneratorUtils.generateTsFileResource(true, 0, sgName);
1850+
IDeviceID device = new StringArrayDeviceID("root.test.d1");
1851+
resource1.updateStartTime(device, 1);
1852+
resource1.updateStartTime(device, 2);
1853+
File dir = resource1.getTsFile().getParentFile();
1854+
if (!dir.exists()) {
1855+
dir.mkdirs();
1856+
}
1857+
resource1.serialize();
1858+
resource1.degradeTimeIndex();
1859+
1860+
TsFileResource resource2 = CompactionFileGeneratorUtils.generateTsFileResource(true, 0, sgName);
1861+
device = new StringArrayDeviceID("root.test.d1");
1862+
resource2.updateStartTime(device, 1);
1863+
resource2.updateStartTime(device, 2);
1864+
resource2.serialize();
1865+
resource2.degradeTimeIndex();
1866+
1867+
CompactionScheduleContext context = new CompactionScheduleContext();
1868+
IDeviceID.Deserializer deserializer = context.getCachedDeviceIdDeserializer();
1869+
1870+
IDeviceID deserializedFromResource1 =
1871+
resource1.buildDeviceTimeIndex(deserializer).getDevices().iterator().next();
1872+
IDeviceID deserializedFromResource2 =
1873+
resource2.buildDeviceTimeIndex(deserializer).getDevices().iterator().next();
1874+
Assert.assertSame(deserializedFromResource1, deserializedFromResource2);
1875+
}
1876+
18421877
public void stopCompactionTaskManager() {
18431878
CompactionTaskManager.getInstance().clearCandidateQueue();
18441879
long sleepTime = 0;

0 commit comments

Comments
 (0)