Skip to content

Commit 4884dc8

Browse files
authored
[IOTDB-1546] Optimize the Upgrade/Rewrite Tool rewrite logic to reduce the temp memory cost (#3701)
1 parent 8168911 commit 4884dc8

File tree

2 files changed

+150
-149
lines changed

2 files changed

+150
-149
lines changed

server/src/main/java/org/apache/iotdb/db/tools/TsFileRewriteTool.java

Lines changed: 60 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19+
1920
package org.apache.iotdb.db.tools;
2021

2122
import org.apache.iotdb.db.engine.StorageEngine;
@@ -44,7 +45,6 @@
4445
import org.apache.iotdb.tsfile.v2.read.TsFileSequenceReaderForV2;
4546
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
4647
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
47-
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
4848
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
4949
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
5050

@@ -159,15 +159,19 @@ public void parseAndRewriteFile(List<TsFileResource> rewrittenResources)
159159
int headerLength = TSFileConfig.MAGIC_STRING.getBytes().length + Byte.BYTES;
160160
reader.position(headerLength);
161161
// start to scan chunks and chunkGroups
162-
List<List<PageHeader>> pageHeadersInChunkGroup = new ArrayList<>();
163-
List<List<ByteBuffer>> pageDataInChunkGroup = new ArrayList<>();
164-
List<List<Boolean>> needToDecodeInfoInChunkGroup = new ArrayList<>();
165162
byte marker;
166-
List<IMeasurementSchema> measurementSchemaList = new ArrayList<>();
167-
String lastChunkGroupDeviceId = null;
163+
164+
String deviceId = null;
165+
boolean firstChunkInChunkGroup = true;
168166
try {
169167
while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
170168
switch (marker) {
169+
case MetaMarker.CHUNK_GROUP_HEADER:
170+
ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
171+
deviceId = chunkGroupHeader.getDeviceID();
172+
firstChunkInChunkGroup = true;
173+
endChunkGroup();
174+
break;
171175
case MetaMarker.CHUNK_HEADER:
172176
case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
173177
ChunkHeader header = reader.readChunkHeader(marker);
@@ -177,7 +181,6 @@ public void parseAndRewriteFile(List<TsFileResource> rewrittenResources)
177181
header.getDataType(),
178182
header.getEncodingType(),
179183
header.getCompressionType());
180-
measurementSchemaList.add(measurementSchema);
181184
TSDataType dataType = header.getDataType();
182185
TSEncoding encoding = header.getEncodingType();
183186
List<PageHeader> pageHeadersInChunk = new ArrayList<>();
@@ -195,26 +198,14 @@ public void parseAndRewriteFile(List<TsFileResource> rewrittenResources)
195198
dataInChunk.add(pageData);
196199
dataSize -= pageHeader.getSerializedPageSize();
197200
}
198-
pageHeadersInChunkGroup.add(pageHeadersInChunk);
199-
pageDataInChunkGroup.add(dataInChunk);
200-
needToDecodeInfoInChunkGroup.add(needToDecodeInfo);
201-
break;
202-
case MetaMarker.CHUNK_GROUP_HEADER:
203-
ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
204-
String deviceId = chunkGroupHeader.getDeviceID();
205-
if (lastChunkGroupDeviceId != null && !measurementSchemaList.isEmpty()) {
206-
rewrite(
207-
lastChunkGroupDeviceId,
208-
measurementSchemaList,
209-
pageHeadersInChunkGroup,
210-
pageDataInChunkGroup,
211-
needToDecodeInfoInChunkGroup);
212-
pageHeadersInChunkGroup.clear();
213-
pageDataInChunkGroup.clear();
214-
measurementSchemaList.clear();
215-
needToDecodeInfoInChunkGroup.clear();
216-
}
217-
lastChunkGroupDeviceId = deviceId;
201+
reWriteChunk(
202+
deviceId,
203+
firstChunkInChunkGroup,
204+
measurementSchema,
205+
pageHeadersInChunk,
206+
dataInChunk,
207+
needToDecodeInfo);
208+
firstChunkInChunkGroup = false;
218209
break;
219210
case MetaMarker.OPERATION_INDEX_RANGE:
220211
reader.readPlanIndex();
@@ -239,19 +230,7 @@ public void parseAndRewriteFile(List<TsFileResource> rewrittenResources)
239230
MetaMarker.handleUnexpectedMarker(marker);
240231
}
241232
}
242-
243-
if (!measurementSchemaList.isEmpty()) {
244-
rewrite(
245-
lastChunkGroupDeviceId,
246-
measurementSchemaList,
247-
pageHeadersInChunkGroup,
248-
pageDataInChunkGroup,
249-
needToDecodeInfoInChunkGroup);
250-
pageHeadersInChunkGroup.clear();
251-
pageDataInChunkGroup.clear();
252-
measurementSchemaList.clear();
253-
needToDecodeInfoInChunkGroup.clear();
254-
}
233+
endChunkGroup();
255234
// close upgraded tsFiles and generate resources for them
256235
for (TsFileIOWriter tsFileIOWriter : partitionWriterMap.values()) {
257236
rewrittenResources.add(endFileAndGenerateResource(tsFileIOWriter));
@@ -302,44 +281,43 @@ protected boolean checkIfNeedToDecode(
302281
}
303282

304283
/**
305-
* This method is for rewriting the ChunkGroup which data is in the different time partitions. In
306-
* this case, we have to decode the data to points, and then rewrite the data points to different
284+
* This method is for rewriting the Chunk which data is in the different time partitions. In this
285+
* case, we have to decode the data to points, and then rewrite the data points to different
307286
* chunkWriters, finally write chunks to their own upgraded TsFiles.
308287
*/
309-
protected void rewrite(
288+
protected void reWriteChunk(
310289
String deviceId,
311-
List<IMeasurementSchema> schemas,
312-
List<List<PageHeader>> pageHeadersInChunkGroup,
313-
List<List<ByteBuffer>> dataInChunkGroup,
314-
List<List<Boolean>> needToDecodeInfoInChunkGroup)
290+
boolean firstChunkInChunkGroup,
291+
MeasurementSchema schema,
292+
List<PageHeader> pageHeadersInChunk,
293+
List<ByteBuffer> pageDataInChunk,
294+
List<Boolean> needToDecodeInfoInChunk)
315295
throws IOException, PageException {
316-
Map<Long, Map<IMeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup = new HashMap<>();
317-
for (int i = 0; i < schemas.size(); i++) {
318-
IMeasurementSchema schema = schemas.get(i);
319-
List<ByteBuffer> pageDataInChunk = dataInChunkGroup.get(i);
320-
List<PageHeader> pageHeadersInChunk = pageHeadersInChunkGroup.get(i);
321-
List<Boolean> needToDecodeInfoInChunk = needToDecodeInfoInChunkGroup.get(i);
322-
valueDecoder = Decoder.getDecoderByType(schema.getEncodingType(), schema.getType());
323-
for (int j = 0; j < pageDataInChunk.size(); j++) {
324-
if (Boolean.TRUE.equals(needToDecodeInfoInChunk.get(j))) {
325-
decodeAndWritePageInToFiles(schema, pageDataInChunk.get(j), chunkWritersInChunkGroup);
326-
} else {
327-
writePageInToFile(
328-
schema, pageHeadersInChunk.get(j), pageDataInChunk.get(j), chunkWritersInChunkGroup);
329-
}
296+
valueDecoder = Decoder.getDecoderByType(schema.getEncodingType(), schema.getType());
297+
Map<Long, ChunkWriterImpl> partitionChunkWriterMap = new HashMap<>();
298+
for (int i = 0; i < pageDataInChunk.size(); i++) {
299+
if (Boolean.TRUE.equals(needToDecodeInfoInChunk.get(i))) {
300+
decodeAndWritePage(schema, pageDataInChunk.get(i), partitionChunkWriterMap);
301+
} else {
302+
writePage(
303+
schema, pageHeadersInChunk.get(i), pageDataInChunk.get(i), partitionChunkWriterMap);
330304
}
331305
}
332-
333-
for (Entry<Long, Map<IMeasurementSchema, ChunkWriterImpl>> entry :
334-
chunkWritersInChunkGroup.entrySet()) {
306+
for (Entry<Long, ChunkWriterImpl> entry : partitionChunkWriterMap.entrySet()) {
335307
long partitionId = entry.getKey();
336308
TsFileIOWriter tsFileIOWriter = partitionWriterMap.get(partitionId);
337-
tsFileIOWriter.startChunkGroup(deviceId);
338-
// write chunks to their own upgraded tsFiles
339-
for (IChunkWriter chunkWriter : entry.getValue().values()) {
340-
chunkWriter.writeToFileWriter(tsFileIOWriter);
309+
if (firstChunkInChunkGroup) {
310+
tsFileIOWriter.startChunkGroup(deviceId);
341311
}
342-
tsFileIOWriter.endChunkGroup();
312+
// write chunks to their own upgraded tsFiles
313+
IChunkWriter chunkWriter = entry.getValue();
314+
chunkWriter.writeToFileWriter(tsFileIOWriter);
315+
}
316+
}
317+
318+
protected void endChunkGroup() throws IOException {
319+
for (TsFileIOWriter tsFileIoWriter : partitionWriterMap.values()) {
320+
tsFileIoWriter.endChunkGroup();
343321
}
344322
}
345323

@@ -381,46 +359,42 @@ protected TsFileIOWriter getOrDefaultTsFileIOWriter(File oldTsFile, long partiti
381359
});
382360
}
383361

384-
protected void writePageInToFile(
385-
IMeasurementSchema schema,
362+
protected void writePage(
363+
MeasurementSchema schema,
386364
PageHeader pageHeader,
387365
ByteBuffer pageData,
388-
Map<Long, Map<IMeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)
366+
Map<Long, ChunkWriterImpl> partitionChunkWriterMap)
389367
throws PageException {
390368
long partitionId = StorageEngine.getTimePartition(pageHeader.getStartTime());
391369
getOrDefaultTsFileIOWriter(oldTsFile, partitionId);
392-
Map<IMeasurementSchema, ChunkWriterImpl> chunkWriters =
393-
chunkWritersInChunkGroup.getOrDefault(partitionId, new HashMap<>());
394-
ChunkWriterImpl chunkWriter = chunkWriters.getOrDefault(schema, new ChunkWriterImpl(schema));
370+
ChunkWriterImpl chunkWriter =
371+
partitionChunkWriterMap.computeIfAbsent(partitionId, v -> new ChunkWriterImpl(schema));
395372
chunkWriter.writePageHeaderAndDataIntoBuff(pageData, pageHeader);
396-
chunkWriters.put(schema, chunkWriter);
397-
chunkWritersInChunkGroup.put(partitionId, chunkWriters);
398373
}
399374

400-
protected void decodeAndWritePageInToFiles(
401-
IMeasurementSchema schema,
375+
protected void decodeAndWritePage(
376+
MeasurementSchema schema,
402377
ByteBuffer pageData,
403-
Map<Long, Map<IMeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup)
378+
Map<Long, ChunkWriterImpl> partitionChunkWriterMap)
404379
throws IOException {
405380
valueDecoder.reset();
406381
PageReader pageReader =
407382
new PageReader(pageData, schema.getType(), valueDecoder, defaultTimeDecoder, null);
408383
BatchData batchData = pageReader.getAllSatisfiedPageData();
409-
rewritePageIntoFiles(batchData, schema, chunkWritersInChunkGroup);
384+
rewritePageIntoFiles(batchData, schema, partitionChunkWriterMap);
410385
}
411386

412387
protected void rewritePageIntoFiles(
413388
BatchData batchData,
414-
IMeasurementSchema schema,
415-
Map<Long, Map<IMeasurementSchema, ChunkWriterImpl>> chunkWritersInChunkGroup) {
389+
MeasurementSchema schema,
390+
Map<Long, ChunkWriterImpl> partitionChunkWriterMap) {
416391
while (batchData.hasCurrent()) {
417392
long time = batchData.currentTime();
418393
Object value = batchData.currentValue();
419394
long partitionId = StorageEngine.getTimePartition(time);
420395

421-
Map<IMeasurementSchema, ChunkWriterImpl> chunkWriters =
422-
chunkWritersInChunkGroup.getOrDefault(partitionId, new HashMap<>());
423-
ChunkWriterImpl chunkWriter = chunkWriters.getOrDefault(schema, new ChunkWriterImpl(schema));
396+
ChunkWriterImpl chunkWriter =
397+
partitionChunkWriterMap.computeIfAbsent(partitionId, v -> new ChunkWriterImpl(schema));
424398
getOrDefaultTsFileIOWriter(oldTsFile, partitionId);
425399
switch (schema.getType()) {
426400
case INT32:
@@ -446,8 +420,6 @@ protected void rewritePageIntoFiles(
446420
String.format("Data type %s is not supported.", schema.getType()));
447421
}
448422
batchData.next();
449-
chunkWriters.put(schema, chunkWriter);
450-
chunkWritersInChunkGroup.put(partitionId, chunkWriters);
451423
}
452424
}
453425

0 commit comments

Comments
 (0)