Skip to content

Commit f2b599e

Browse files
committed
replace region id for object binary
1 parent 64f9665 commit f2b599e

File tree

6 files changed

+80
-11
lines changed

6 files changed

+80
-11
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -533,7 +533,8 @@ public static void removeDeletedObjectFiles(
533533
TsFileSequenceReader reader,
534534
List<AbstractAlignedChunkMetadata> alignedChunkMetadataList,
535535
List<ModEntry> timeMods,
536-
List<List<ModEntry>> valueMods)
536+
List<List<ModEntry>> valueMods,
537+
int currentRegionId)
537538
throws IOException {
538539
if (alignedChunkMetadataList.isEmpty()) {
539540
return;
@@ -578,7 +579,8 @@ public static void removeDeletedObjectFiles(
578579
objectColumnIndexList,
579580
timeDeletionIntervalList,
580581
objectDeletionIntervalList,
581-
deletionCursors);
582+
deletionCursors,
583+
currentRegionId);
582584
}
583585
}
584586

@@ -589,7 +591,8 @@ private static void removeDeletedObjectFiles(
589591
List<Integer> objectColumnIndexList,
590592
List<ModEntry> timeDeletions,
591593
List<List<ModEntry>> objectDeletions,
592-
int[] deletionCursors)
594+
int[] deletionCursors,
595+
int currentRegionId)
593596
throws IOException {
594597
Chunk timeChunk =
595598
reader.readMemChunk((ChunkMetadata) alignedChunkMetadata.getTimeChunkMetadata());
@@ -612,6 +615,12 @@ private static void removeDeletedObjectFiles(
612615
continue;
613616
}
614617
Chunk chunk = reader.readMemChunk(valueChunkMetadata);
618+
if (chunk != null) {
619+
chunk
620+
.getHeader()
621+
.setReplaceDecoder(
622+
decoder -> ObjectTypeUtils.getReplaceDecoder(decoder, currentRegionId));
623+
}
615624
valueChunks.add(chunk);
616625
valuePages.add(
617626
chunk == null

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,8 @@ private void applyModificationForAlignedChunkMetadataList(
489489
readerMap.get(tsFileResource),
490490
alignedChunkMetadataList,
491491
Collections.singletonList(ttlDeletion),
492-
modificationForValueColumns.stream().map(v -> emptyList).collect(Collectors.toList()));
492+
modificationForValueColumns.stream().map(v -> emptyList).collect(Collectors.toList()),
493+
tsFileResource.getTsFileID().regionId);
493494
}
494495

495496
ModificationUtils.modifyAlignedChunkMetaData(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,8 @@ protected List<AbstractAlignedChunkMetadata> getAlignedChunkMetadataList(TsFileR
284284
readerCacheMap.get(resource),
285285
alignedChunkMetadataList,
286286
Collections.singletonList(ttlDeletion),
287-
valueModifications.stream().map(v -> emptyList).collect(Collectors.toList()));
287+
valueModifications.stream().map(v -> emptyList).collect(Collectors.toList()),
288+
resource.getTsFileID().regionId);
288289
}
289290

290291
// modify aligned chunk metadatas

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionAlignedChunkReader.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public IPointReader getPagePointReader(
9292
ByteBuffer compressedTimePageData,
9393
List<ByteBuffer> compressedValuePageDatas)
9494
throws IOException {
95-
return getPontReader(
95+
return getPointReader(
9696
timePageHeader,
9797
valuePageHeaders,
9898
compressedTimePageData,
@@ -106,11 +106,11 @@ public IPointReader getBatchedPagePointReader(
106106
ByteBuffer compressedTimePageData,
107107
List<ByteBuffer> compressedValuePageDatas)
108108
throws IOException {
109-
return getPontReader(
109+
return getPointReader(
110110
timePageHeader, valuePageHeaders, compressedTimePageData, compressedValuePageDatas, false);
111111
}
112112

113-
private IPointReader getPontReader(
113+
private IPointReader getPointReader(
114114
PageHeader timePageHeader,
115115
List<PageHeader> valuePageHeaders,
116116
ByteBuffer compressedTimePageData,
@@ -146,7 +146,7 @@ private IPointReader getPontReader(
146146
valuePageHeaders.get(i),
147147
uncompressedPageData,
148148
valueType,
149-
Decoder.getDecoderByType(valueChunkHeader.getEncodingType(), valueType));
149+
valueChunkHeader.calculateDecoderForNonTimeChunk());
150150
valuePageReader.setDeleteIntervalList(valueDeleteIntervalList.get(i));
151151
valuePageReaders.add(valuePageReader);
152152
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/DiskAlignedChunkLoader.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
import org.apache.iotdb.db.storageengine.buffer.ChunkCache;
2525
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
2626
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
27+
import org.apache.iotdb.db.utils.ObjectTypeUtils;
2728

29+
import org.apache.tsfile.enums.TSDataType;
2830
import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata;
2931
import org.apache.tsfile.file.metadata.ChunkMetadata;
3032
import org.apache.tsfile.file.metadata.IChunkMetadata;
@@ -92,7 +94,7 @@ public IChunkReader getChunkReader(IChunkMetadata chunkMetaData, Filter globalTi
9294
context);
9395
List<Chunk> valueChunkList = new ArrayList<>();
9496
for (IChunkMetadata valueChunkMetadata : alignedChunkMetadata.getValueChunkMetadataList()) {
95-
valueChunkList.add(
97+
Chunk chunk =
9698
valueChunkMetadata == null
9799
? null
98100
: ChunkCache.getInstance()
@@ -104,7 +106,17 @@ public IChunkReader getChunkReader(IChunkMetadata chunkMetaData, Filter globalTi
104106
resource.isClosed()),
105107
valueChunkMetadata.getDeleteIntervalList(),
106108
valueChunkMetadata.getStatistics(),
107-
context));
109+
context);
110+
final TsFileID tsFileID = getTsFileID();
111+
if (chunk != null
112+
&& tsFileID.regionId > 0
113+
&& chunkMetaData.getDataType() == TSDataType.OBJECT) {
114+
chunk
115+
.getHeader()
116+
.setReplaceDecoder(
117+
decoder -> ObjectTypeUtils.getReplaceDecoder(decoder, tsFileID.regionId));
118+
}
119+
valueChunkList.add(chunk);
108120
}
109121

110122
long t2 = System.nanoTime();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@
3535
import org.apache.iotdb.mpp.rpc.thrift.TReadObjectReq;
3636
import org.apache.iotdb.rpc.TSStatusCode;
3737

38+
import org.apache.tsfile.encoding.decoder.Decoder;
39+
import org.apache.tsfile.encoding.decoder.DecoderWrapper;
3840
import org.apache.tsfile.utils.Binary;
41+
import org.apache.tsfile.utils.BytesUtils;
3942
import org.apache.tsfile.utils.Pair;
4043
import org.slf4j.Logger;
4144
import org.slf4j.LoggerFactory;
@@ -44,7 +47,9 @@
4447
import java.io.IOException;
4548
import java.nio.ByteBuffer;
4649
import java.nio.channels.FileChannel;
50+
import java.nio.charset.StandardCharsets;
4751
import java.nio.file.Files;
52+
import java.nio.file.Path;
4853
import java.nio.file.Paths;
4954
import java.nio.file.StandardOpenOption;
5055
import java.util.Collections;
@@ -147,6 +152,47 @@ private static ByteBuffer readObjectContentFromRemoteFile(
147152
return buffer;
148153
}
149154

155+
public static Binary generateObjectBinary(long objectSize, String relativePath) {
156+
byte[] filePathBytes = relativePath.getBytes(StandardCharsets.UTF_8);
157+
byte[] valueBytes = new byte[filePathBytes.length + Long.BYTES];
158+
System.arraycopy(BytesUtils.longToBytes(objectSize), 0, valueBytes, 0, Long.BYTES);
159+
System.arraycopy(filePathBytes, 0, valueBytes, Long.BYTES, filePathBytes.length);
160+
return new Binary(valueBytes);
161+
}
162+
163+
public static DecoderWrapper getReplaceDecoder(final Decoder decoder, final int newRegionId) {
164+
return new ObjectRegionIdReplaceDecoder(decoder, newRegionId);
165+
}
166+
167+
private static class ObjectRegionIdReplaceDecoder extends DecoderWrapper {
168+
169+
private final int newRegionId;
170+
171+
public ObjectRegionIdReplaceDecoder(Decoder decoder, int newRegionId) {
172+
super(decoder);
173+
this.newRegionId = newRegionId;
174+
}
175+
176+
@Override
177+
public Binary readBinary(ByteBuffer buffer) {
178+
Binary originValue = originDecoder.readBinary(buffer);
179+
Pair<Long, String> pair = ObjectTypeUtils.parseObjectBinary(originValue);
180+
try {
181+
Path path = Paths.get(pair.getRight());
182+
int regionId = Integer.parseInt(path.getName(0).toString());
183+
if (regionId == newRegionId) {
184+
return originValue;
185+
}
186+
String newPath = pair.getRight().replaceFirst(regionId + "", newRegionId + "");
187+
return ObjectTypeUtils.generateObjectBinary(pair.getLeft(), newPath);
188+
} catch (NumberFormatException e) {
189+
throw new IoTDBRuntimeException(
190+
"wrong object file path: " + pair.getRight(),
191+
TSStatusCode.OBJECT_READ_ERROR.getStatusCode());
192+
}
193+
}
194+
}
195+
150196
public static int getActualReadSize(String filePath, long fileSize, long offset, long length) {
151197
if (offset >= fileSize) {
152198
throw new SemanticException(

0 commit comments

Comments
 (0)