Skip to content

Commit b4161dd

Browse files
authored
[To dev/1.3] perf: add encodeBatch interface to accelerate flushing for sequential insert (#15244)
* perf: batch encode to improve insert performance * fix IoTDBSimpleQueryIT * move batchEncodeInfo / times as MemTableFlushTask member
1 parent f4b5d0e commit b4161dd

15 files changed

+613
-228
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,15 @@
2727
import org.apache.iotdb.db.service.metrics.WritingMetrics;
2828
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
2929
import org.apache.iotdb.db.storageengine.dataregion.flush.pool.FlushSubTaskPoolManager;
30+
import org.apache.iotdb.db.storageengine.dataregion.memtable.AlignedWritableMemChunk;
3031
import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
3132
import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunk;
3233
import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGroup;
3334
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
35+
import org.apache.iotdb.db.utils.datastructure.BatchEncodeInfo;
3436
import org.apache.iotdb.metrics.utils.MetricLevel;
3537

38+
import org.apache.tsfile.common.conf.TSFileDescriptor;
3639
import org.apache.tsfile.file.metadata.IDeviceID;
3740
import org.apache.tsfile.write.chunk.IChunkWriter;
3841
import org.apache.tsfile.write.writer.RestorableTsFileIOWriter;
@@ -62,6 +65,9 @@ public class MemTableFlushTask {
6265
FlushSubTaskPoolManager.getInstance();
6366
private static final WritingMetrics WRITING_METRICS = WritingMetrics.getInstance();
6467
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
68+
private final int MAX_NUMBER_OF_POINTS_IN_PAGE =
69+
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
70+
6571
/* storage group name -> last time */
6672
private static final Map<String, Long> flushPointsCache = new ConcurrentHashMap<>();
6773
private final Future<?> encodingTaskFuture;
@@ -82,6 +88,9 @@ public class MemTableFlushTask {
8288
private volatile long memSerializeTime = 0L;
8389
private volatile long ioTime = 0L;
8490

91+
private final BatchEncodeInfo encodeInfo;
92+
private long[] times;
93+
8594
/**
8695
* @param memTable the memTable to flush
8796
* @param writer the writer where memTable will be flushed to (current tsfile writer or vm writer)
@@ -98,6 +107,7 @@ public MemTableFlushTask(
98107
this.dataRegionId = dataRegionId;
99108
this.encodingTaskFuture = SUB_TASK_POOL_MANAGER.submit(encodingTask);
100109
this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(ioTask);
110+
this.encodeInfo = new BatchEncodeInfo(0, 0, 0);
101111
LOGGER.debug(
102112
"flush task of database {} memtable is created, flushing to file {}.",
103113
storageGroup,
@@ -248,7 +258,10 @@ public void run() {
248258
} else {
249259
long starTime = System.currentTimeMillis();
250260
IWritableMemChunk writableMemChunk = (IWritableMemChunk) task;
251-
writableMemChunk.encode(ioTaskQueue);
261+
if (writableMemChunk instanceof AlignedWritableMemChunk && times == null) {
262+
times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE];
263+
}
264+
writableMemChunk.encode(ioTaskQueue, encodeInfo, times);
252265
long subTaskTime = System.currentTimeMillis() - starTime;
253266
WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.ENCODING_TASK, subTaskTime);
254267
memSerializeTime += subTaskTime;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
2525
import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
2626
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
27+
import org.apache.iotdb.db.utils.datastructure.BatchEncodeInfo;
2728
import org.apache.iotdb.db.utils.datastructure.TVList;
2829

2930
import org.apache.tsfile.enums.TSDataType;
@@ -189,7 +190,8 @@ public abstract void writeAlignedTablet(
189190
public abstract IChunkWriter createIChunkWriter();
190191

191192
@Override
192-
public abstract void encode(BlockingQueue<Object> ioTaskQueue);
193+
public abstract void encode(
194+
BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo, long[] times);
193195

194196
@Override
195197
public abstract void release();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java

Lines changed: 29 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -24,21 +24,20 @@
2424
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
2525
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
2626
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
27+
import org.apache.iotdb.db.utils.datastructure.BatchEncodeInfo;
2728
import org.apache.iotdb.db.utils.datastructure.MemPointIterator;
2829
import org.apache.iotdb.db.utils.datastructure.MemPointIteratorFactory;
2930
import org.apache.iotdb.db.utils.datastructure.TVList;
3031

3132
import org.apache.tsfile.common.conf.TSFileDescriptor;
3233
import org.apache.tsfile.enums.TSDataType;
3334
import org.apache.tsfile.read.common.TimeRange;
34-
import org.apache.tsfile.read.common.block.TsBlock;
3535
import org.apache.tsfile.utils.Binary;
3636
import org.apache.tsfile.utils.BitMap;
3737
import org.apache.tsfile.utils.Pair;
3838
import org.apache.tsfile.write.UnSupportedDataTypeException;
3939
import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
4040
import org.apache.tsfile.write.chunk.IChunkWriter;
41-
import org.apache.tsfile.write.chunk.ValueChunkWriter;
4241
import org.apache.tsfile.write.schema.IMeasurementSchema;
4342
import org.apache.tsfile.write.schema.MeasurementSchema;
4443

@@ -64,6 +63,7 @@ public class AlignedWritableMemChunk extends AbstractWritableMemChunk {
6463
private final List<IMeasurementSchema> schemaList;
6564
private AlignedTVList list;
6665
private List<AlignedTVList> sortedList;
66+
private long sortedRowCount = 0;
6767

6868
private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
6969
private final long TARGET_CHUNK_SIZE = CONFIG.getTargetChunkSize();
@@ -191,6 +191,7 @@ protected void handoverAlignedTvList() {
191191
list.sort();
192192
}
193193
sortedList.add(list);
194+
this.sortedRowCount += list.rowCount();
194195
this.list = AlignedTVList.newAlignedList(new ArrayList<>(dataTypes));
195196
this.dataTypes = list.getTsDataTypes();
196197
}
@@ -241,15 +242,11 @@ public long count() {
241242

242243
@Override
243244
public long rowCount() {
244-
return alignedListSize();
245+
return sortedRowCount + list.rowCount();
245246
}
246247

247248
public int alignedListSize() {
248-
int rowCount = list.rowCount();
249-
for (AlignedTVList alignedTvList : sortedList) {
250-
rowCount += alignedTvList.rowCount();
251-
}
252-
return rowCount;
249+
return (int) rowCount();
253250
}
254251

255252
@Override
@@ -516,107 +513,55 @@ private void handleEncoding(
516513
}
517514

518515
@Override
519-
public synchronized void encode(BlockingQueue<Object> ioTaskQueue) {
516+
public synchronized void encode(
517+
BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo, long[] times) {
520518
if (TVLIST_SORT_THRESHOLD == 0) {
521519
encodeWorkingAlignedTVList(ioTaskQueue);
522520
return;
523521
}
524522

525523
AlignedChunkWriterImpl alignedChunkWriter = new AlignedChunkWriterImpl(schemaList);
524+
526525
// create MergeSortAlignedTVListIterator.
527526
List<AlignedTVList> alignedTvLists = new ArrayList<>(sortedList);
528527
alignedTvLists.add(list);
528+
List<Integer> columnIndexList = buildColumnIndexList(schemaList);
529529
MemPointIterator timeValuePairIterator =
530-
MemPointIteratorFactory.create(dataTypes, null, alignedTvLists);
531-
532-
int pointNumInPage = 0;
533-
int pointNumInChunk = 0;
534-
long[] times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE];
530+
MemPointIteratorFactory.create(dataTypes, columnIndexList, alignedTvLists);
535531

536532
while (timeValuePairIterator.hasNextBatch()) {
537-
TsBlock tsBlock = timeValuePairIterator.nextBatch();
538-
if (tsBlock == null) {
539-
continue;
533+
timeValuePairIterator.encodeBatch(alignedChunkWriter, encodeInfo, times);
534+
if (encodeInfo.pointNumInPage >= MAX_NUMBER_OF_POINTS_IN_PAGE) {
535+
alignedChunkWriter.write(times, encodeInfo.pointNumInPage, 0);
536+
encodeInfo.pointNumInPage = 0;
540537
}
541-
for (int rowIndex = 0; rowIndex < tsBlock.getPositionCount(); rowIndex++) {
542-
long time = tsBlock.getTimeByIndex(rowIndex);
543-
times[pointNumInPage] = time;
544538

545-
for (int columnIndex = 0; columnIndex < dataTypes.size(); columnIndex++) {
546-
ValueChunkWriter valueChunkWriter =
547-
alignedChunkWriter.getValueChunkWriterByIndex(columnIndex);
548-
if (tsBlock.getColumn(columnIndex).isNull(rowIndex)) {
549-
valueChunkWriter.write(time, null, true);
550-
continue;
551-
}
552-
switch (schemaList.get(columnIndex).getType()) {
553-
case BOOLEAN:
554-
valueChunkWriter.write(
555-
time, tsBlock.getColumn(columnIndex).getBoolean(rowIndex), false);
556-
break;
557-
case INT32:
558-
case DATE:
559-
valueChunkWriter.write(time, tsBlock.getColumn(columnIndex).getInt(rowIndex), false);
560-
break;
561-
case INT64:
562-
case TIMESTAMP:
563-
valueChunkWriter.write(time, tsBlock.getColumn(columnIndex).getLong(rowIndex), false);
564-
break;
565-
case FLOAT:
566-
valueChunkWriter.write(
567-
time, tsBlock.getColumn(columnIndex).getFloat(rowIndex), false);
568-
break;
569-
case DOUBLE:
570-
valueChunkWriter.write(
571-
time, tsBlock.getColumn(columnIndex).getDouble(rowIndex), false);
572-
break;
573-
case TEXT:
574-
case BLOB:
575-
case STRING:
576-
valueChunkWriter.write(
577-
time, tsBlock.getColumn(columnIndex).getBinary(rowIndex), false);
578-
break;
579-
default:
580-
break;
581-
}
582-
}
583-
pointNumInPage++;
584-
pointNumInChunk++;
585-
586-
// new page
587-
if (pointNumInPage == MAX_NUMBER_OF_POINTS_IN_PAGE
588-
|| pointNumInChunk >= maxNumberOfPointsInChunk) {
589-
alignedChunkWriter.write(times, pointNumInPage, 0);
590-
pointNumInPage = 0;
591-
}
592-
593-
// new chunk
594-
if (pointNumInChunk >= maxNumberOfPointsInChunk) {
595-
alignedChunkWriter.sealCurrentPage();
596-
alignedChunkWriter.clearPageWriter();
597-
try {
598-
ioTaskQueue.put(alignedChunkWriter);
599-
} catch (InterruptedException e) {
600-
Thread.currentThread().interrupt();
601-
}
602-
alignedChunkWriter = new AlignedChunkWriterImpl(schemaList);
603-
pointNumInChunk = 0;
539+
if (encodeInfo.pointNumInChunk >= maxNumberOfPointsInChunk) {
540+
alignedChunkWriter.sealCurrentPage();
541+
alignedChunkWriter.clearPageWriter();
542+
try {
543+
ioTaskQueue.put(alignedChunkWriter);
544+
} catch (InterruptedException e) {
545+
Thread.currentThread().interrupt();
604546
}
547+
alignedChunkWriter = new AlignedChunkWriterImpl(schemaList);
548+
encodeInfo.reset();
605549
}
606550
}
607551

608552
// last batch of points
609-
if (pointNumInChunk > 0) {
610-
if (pointNumInPage > 0) {
611-
alignedChunkWriter.write(times, pointNumInPage, 0);
612-
alignedChunkWriter.sealCurrentPage();
613-
alignedChunkWriter.clearPageWriter();
553+
if (encodeInfo.pointNumInChunk > 0) {
554+
if (encodeInfo.pointNumInPage > 0) {
555+
alignedChunkWriter.write(times, encodeInfo.pointNumInPage, 0);
614556
}
557+
alignedChunkWriter.sealCurrentPage();
558+
alignedChunkWriter.clearPageWriter();
615559
try {
616560
ioTaskQueue.put(alignedChunkWriter);
617561
} catch (InterruptedException e) {
618562
Thread.currentThread().interrupt();
619563
}
564+
encodeInfo.reset();
620565
}
621566
}
622567

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.iotdb.db.storageengine.dataregion.memtable;
2020

2121
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
22+
import org.apache.iotdb.db.utils.datastructure.BatchEncodeInfo;
2223
import org.apache.iotdb.db.utils.datastructure.TVList;
2324

2425
import org.apache.tsfile.enums.TSDataType;
@@ -108,7 +109,7 @@ default long getMinTime() {
108109

109110
IChunkWriter createIChunkWriter();
110111

111-
void encode(BlockingQueue<Object> ioTaskQueue);
112+
void encode(BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo, long[] times);
112113

113114
void release();
114115

0 commit comments

Comments
 (0)