Skip to content

Commit ef8dc52

Browse files
authored
perf: add encodeBatch interface to accelerate flushing for sequential insert (apache#15243)
* perf: batch encode to improve insert performance * fix IoTDBSimpleQueryIT * move batchEncodeInfo / times as MemTableFlushTask member
1 parent eccbcc2 commit ef8dc52

15 files changed

+620
-229
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,15 @@
2727
import org.apache.iotdb.db.service.metrics.WritingMetrics;
2828
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
2929
import org.apache.iotdb.db.storageengine.dataregion.flush.pool.FlushSubTaskPoolManager;
30+
import org.apache.iotdb.db.storageengine.dataregion.memtable.AlignedWritableMemChunk;
3031
import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
3132
import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunk;
3233
import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGroup;
3334
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
35+
import org.apache.iotdb.db.utils.datastructure.BatchEncodeInfo;
3436
import org.apache.iotdb.metrics.utils.MetricLevel;
3537

38+
import org.apache.tsfile.common.conf.TSFileDescriptor;
3639
import org.apache.tsfile.file.metadata.IDeviceID;
3740
import org.apache.tsfile.write.chunk.IChunkWriter;
3841
import org.apache.tsfile.write.writer.RestorableTsFileIOWriter;
@@ -62,6 +65,9 @@ public class MemTableFlushTask {
6265
FlushSubTaskPoolManager.getInstance();
6366
private static final WritingMetrics WRITING_METRICS = WritingMetrics.getInstance();
6467
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
68+
private final int MAX_NUMBER_OF_POINTS_IN_PAGE =
69+
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
70+
6571
/* storage group name -> last time */
6672
private static final Map<String, Long> flushPointsCache = new ConcurrentHashMap<>();
6773
private final Future<?> encodingTaskFuture;
@@ -82,6 +88,9 @@ public class MemTableFlushTask {
8288
private volatile long memSerializeTime = 0L;
8389
private volatile long ioTime = 0L;
8490

91+
private final BatchEncodeInfo encodeInfo;
92+
private long[] times;
93+
8594
/**
8695
* @param memTable the memTable to flush
8796
* @param writer the writer where memTable will be flushed to (current tsfile writer or vm writer)
@@ -98,6 +107,7 @@ public MemTableFlushTask(
98107
this.dataRegionId = dataRegionId;
99108
this.encodingTaskFuture = SUB_TASK_POOL_MANAGER.submit(encodingTask);
100109
this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(ioTask);
110+
this.encodeInfo = new BatchEncodeInfo(0, 0, 0);
101111
LOGGER.debug(
102112
"flush task of database {} memtable is created, flushing to file {}.",
103113
storageGroup,
@@ -248,7 +258,10 @@ public void run() {
248258
} else {
249259
long starTime = System.currentTimeMillis();
250260
IWritableMemChunk writableMemChunk = (IWritableMemChunk) task;
251-
writableMemChunk.encode(ioTaskQueue);
261+
if (writableMemChunk instanceof AlignedWritableMemChunk && times == null) {
262+
times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE];
263+
}
264+
writableMemChunk.encode(ioTaskQueue, encodeInfo, times);
252265
long subTaskTime = System.currentTimeMillis() - starTime;
253266
WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.ENCODING_TASK, subTaskTime);
254267
memSerializeTime += subTaskTime;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
2626
import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
2727
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
28+
import org.apache.iotdb.db.utils.datastructure.BatchEncodeInfo;
2829
import org.apache.iotdb.db.utils.datastructure.TVList;
2930

3031
import org.apache.tsfile.enums.TSDataType;
@@ -192,7 +193,8 @@ public abstract void writeAlignedTablet(
192193
public abstract IChunkWriter createIChunkWriter();
193194

194195
@Override
195-
public abstract void encode(BlockingQueue<Object> ioTaskQueue);
196+
public abstract void encode(
197+
BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo, long[] times);
196198

197199
@Override
198200
public abstract void release();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java

Lines changed: 30 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,20 @@
2525
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
2626
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
2727
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
28+
import org.apache.iotdb.db.utils.datastructure.BatchEncodeInfo;
2829
import org.apache.iotdb.db.utils.datastructure.MemPointIterator;
2930
import org.apache.iotdb.db.utils.datastructure.MemPointIteratorFactory;
3031
import org.apache.iotdb.db.utils.datastructure.TVList;
3132

3233
import org.apache.tsfile.common.conf.TSFileDescriptor;
3334
import org.apache.tsfile.enums.TSDataType;
3435
import org.apache.tsfile.read.common.TimeRange;
35-
import org.apache.tsfile.read.common.block.TsBlock;
3636
import org.apache.tsfile.utils.Binary;
3737
import org.apache.tsfile.utils.BitMap;
3838
import org.apache.tsfile.utils.Pair;
3939
import org.apache.tsfile.write.UnSupportedDataTypeException;
4040
import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
4141
import org.apache.tsfile.write.chunk.IChunkWriter;
42-
import org.apache.tsfile.write.chunk.ValueChunkWriter;
4342
import org.apache.tsfile.write.schema.IMeasurementSchema;
4443
import org.apache.tsfile.write.schema.MeasurementSchema;
4544

@@ -65,6 +64,7 @@ public class AlignedWritableMemChunk extends AbstractWritableMemChunk {
6564
private final List<IMeasurementSchema> schemaList;
6665
private AlignedTVList list;
6766
private List<AlignedTVList> sortedList;
67+
private long sortedRowCount = 0;
6868
private final boolean ignoreAllNullRows;
6969

7070
private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
@@ -197,6 +197,7 @@ protected void handoverAlignedTvList() {
197197
list.sort();
198198
}
199199
sortedList.add(list);
200+
this.sortedRowCount += list.rowCount();
200201
this.list = AlignedTVList.newAlignedList(new ArrayList<>(dataTypes));
201202
this.dataTypes = list.getTsDataTypes();
202203
}
@@ -352,15 +353,11 @@ public long count() {
352353

353354
@Override
354355
public long rowCount() {
355-
return alignedListSize();
356+
return sortedRowCount + list.rowCount();
356357
}
357358

358359
public int alignedListSize() {
359-
int rowCount = list.rowCount();
360-
for (AlignedTVList alignedTvList : sortedList) {
361-
rowCount += alignedTvList.rowCount();
362-
}
363-
return rowCount;
360+
return (int) rowCount();
364361
}
365362

366363
@Override
@@ -638,107 +635,56 @@ private void handleEncoding(
638635
}
639636

640637
@Override
641-
public synchronized void encode(BlockingQueue<Object> ioTaskQueue) {
638+
public synchronized void encode(
639+
BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo, long[] times) {
642640
if (TVLIST_SORT_THRESHOLD == 0) {
643641
encodeWorkingAlignedTVList(ioTaskQueue);
644642
return;
645643
}
646644

647645
AlignedChunkWriterImpl alignedChunkWriter = new AlignedChunkWriterImpl(schemaList);
646+
648647
// create MergeSortAlignedTVListIterator.
649648
List<AlignedTVList> alignedTvLists = new ArrayList<>(sortedList);
650649
alignedTvLists.add(list);
650+
List<Integer> columnIndexList = buildColumnIndexList(schemaList);
651651
MemPointIterator timeValuePairIterator =
652-
MemPointIteratorFactory.create(dataTypes, null, alignedTvLists, ignoreAllNullRows);
653-
654-
int pointNumInPage = 0;
655-
int pointNumInChunk = 0;
656-
long[] times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE];
652+
MemPointIteratorFactory.create(
653+
dataTypes, columnIndexList, alignedTvLists, ignoreAllNullRows);
657654

658655
while (timeValuePairIterator.hasNextBatch()) {
659-
TsBlock tsBlock = timeValuePairIterator.nextBatch();
660-
if (tsBlock == null) {
661-
continue;
656+
timeValuePairIterator.encodeBatch(alignedChunkWriter, encodeInfo, times);
657+
if (encodeInfo.pointNumInPage >= MAX_NUMBER_OF_POINTS_IN_PAGE) {
658+
alignedChunkWriter.write(times, encodeInfo.pointNumInPage, 0);
659+
encodeInfo.pointNumInPage = 0;
662660
}
663-
for (int rowIndex = 0; rowIndex < tsBlock.getPositionCount(); rowIndex++) {
664-
long time = tsBlock.getTimeByIndex(rowIndex);
665-
times[pointNumInPage] = time;
666661

667-
for (int columnIndex = 0; columnIndex < dataTypes.size(); columnIndex++) {
668-
ValueChunkWriter valueChunkWriter =
669-
alignedChunkWriter.getValueChunkWriterByIndex(columnIndex);
670-
if (tsBlock.getColumn(columnIndex).isNull(rowIndex)) {
671-
valueChunkWriter.write(time, null, true);
672-
continue;
673-
}
674-
switch (schemaList.get(columnIndex).getType()) {
675-
case BOOLEAN:
676-
valueChunkWriter.write(
677-
time, tsBlock.getColumn(columnIndex).getBoolean(rowIndex), false);
678-
break;
679-
case INT32:
680-
case DATE:
681-
valueChunkWriter.write(time, tsBlock.getColumn(columnIndex).getInt(rowIndex), false);
682-
break;
683-
case INT64:
684-
case TIMESTAMP:
685-
valueChunkWriter.write(time, tsBlock.getColumn(columnIndex).getLong(rowIndex), false);
686-
break;
687-
case FLOAT:
688-
valueChunkWriter.write(
689-
time, tsBlock.getColumn(columnIndex).getFloat(rowIndex), false);
690-
break;
691-
case DOUBLE:
692-
valueChunkWriter.write(
693-
time, tsBlock.getColumn(columnIndex).getDouble(rowIndex), false);
694-
break;
695-
case TEXT:
696-
case BLOB:
697-
case STRING:
698-
valueChunkWriter.write(
699-
time, tsBlock.getColumn(columnIndex).getBinary(rowIndex), false);
700-
break;
701-
default:
702-
break;
703-
}
704-
}
705-
pointNumInPage++;
706-
pointNumInChunk++;
707-
708-
// new page
709-
if (pointNumInPage == MAX_NUMBER_OF_POINTS_IN_PAGE
710-
|| pointNumInChunk >= maxNumberOfPointsInChunk) {
711-
alignedChunkWriter.write(times, pointNumInPage, 0);
712-
pointNumInPage = 0;
713-
}
714-
715-
// new chunk
716-
if (pointNumInChunk >= maxNumberOfPointsInChunk) {
717-
alignedChunkWriter.sealCurrentPage();
718-
alignedChunkWriter.clearPageWriter();
719-
try {
720-
ioTaskQueue.put(alignedChunkWriter);
721-
} catch (InterruptedException e) {
722-
Thread.currentThread().interrupt();
723-
}
724-
alignedChunkWriter = new AlignedChunkWriterImpl(schemaList);
725-
pointNumInChunk = 0;
662+
if (encodeInfo.pointNumInChunk >= maxNumberOfPointsInChunk) {
663+
alignedChunkWriter.sealCurrentPage();
664+
alignedChunkWriter.clearPageWriter();
665+
try {
666+
ioTaskQueue.put(alignedChunkWriter);
667+
} catch (InterruptedException e) {
668+
Thread.currentThread().interrupt();
726669
}
670+
alignedChunkWriter = new AlignedChunkWriterImpl(schemaList);
671+
encodeInfo.reset();
727672
}
728673
}
729674

730675
// last batch of points
731-
if (pointNumInChunk > 0) {
732-
if (pointNumInPage > 0) {
733-
alignedChunkWriter.write(times, pointNumInPage, 0);
734-
alignedChunkWriter.sealCurrentPage();
735-
alignedChunkWriter.clearPageWriter();
676+
if (encodeInfo.pointNumInChunk > 0) {
677+
if (encodeInfo.pointNumInPage > 0) {
678+
alignedChunkWriter.write(times, encodeInfo.pointNumInPage, 0);
736679
}
680+
alignedChunkWriter.sealCurrentPage();
681+
alignedChunkWriter.clearPageWriter();
737682
try {
738683
ioTaskQueue.put(alignedChunkWriter);
739684
} catch (InterruptedException e) {
740685
Thread.currentThread().interrupt();
741686
}
687+
encodeInfo.reset();
742688
}
743689
}
744690

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2222
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
23+
import org.apache.iotdb.db.utils.datastructure.BatchEncodeInfo;
2324
import org.apache.iotdb.db.utils.datastructure.TVList;
2425

2526
import org.apache.tsfile.enums.TSDataType;
@@ -111,7 +112,7 @@ default long getMinTime() {
111112

112113
IChunkWriter createIChunkWriter();
113114

114-
void encode(BlockingQueue<Object> ioTaskQueue);
115+
void encode(BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo, long[] times);
115116

116117
void release();
117118

0 commit comments

Comments
 (0)