Skip to content

Commit 4582316

Browse files
shuwenweiJackieTien97
authored andcommitted
Fix slowQueryThreshold & optimize encodeBatch (#16765)
(cherry picked from commit 03b60d1)
1 parent 60e8c63 commit 4582316

File tree

3 files changed

+72
-3
lines changed

3 files changed

+72
-3
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -812,7 +812,7 @@ public class IoTDBConfig {
812812
private int thriftDefaultBufferSize = RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY;
813813

814814
/** time cost(ms) threshold for slow query. Unit: millisecond */
815-
private long slowQueryThreshold = 30000;
815+
private long slowQueryThreshold = 10000;
816816

817817
private int patternMatchingThreshold = 1000000;
818818

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ private Object getAlignedValueForQuery(
278278
return getAlignedValueByValueIndex(valueIndex, null, floatPrecision, encodingList);
279279
}
280280

281+
@SuppressWarnings("java:S6541")
281282
private TsPrimitiveType getAlignedValueByValueIndex(
282283
int valueIndex,
283284
int[] validIndexesForTimeDuplicatedRows,
@@ -1051,6 +1052,7 @@ public static long valueListArrayMemCost(TSDataType type) {
10511052
}
10521053

10531054
/** Build TsBlock by column. */
1055+
@SuppressWarnings("java:S6541")
10541056
public TsBlock buildTsBlock(
10551057
int floatPrecision,
10561058
List<TSEncoding> encodingList,
@@ -1371,6 +1373,7 @@ public void serializeToWAL(IWALByteBufferView buffer) {
13711373
}
13721374
}
13731375

1376+
@SuppressWarnings("java:S6541")
13741377
public static AlignedTVList deserialize(DataInputStream stream) throws IOException {
13751378
TSDataType dataType = ReadWriteIOUtils.readDataType(stream);
13761379
if (dataType != TSDataType.VECTOR) {
@@ -1693,6 +1696,7 @@ public AlignedTVListIterator(
16931696
}
16941697

16951698
@Override
1699+
@SuppressWarnings("java:S6541")
16961700
protected void prepareNext() {
16971701
// find the first row that is neither deleted nor empty (all NULL values)
16981702
findValidRow = false;
@@ -1870,6 +1874,7 @@ public TsPrimitiveType getPrimitiveTypeObject(int rowIndex, int columnIndex) {
18701874
}
18711875

18721876
@Override
1877+
@SuppressWarnings("java:S6541")
18731878
public boolean hasNextBatch() {
18741879
if (!paginationController.hasCurLimit()) {
18751880
return false;
@@ -2150,11 +2155,18 @@ private TsBlock reBuildTsBlock(
21502155
}
21512156

21522157
@Override
2158+
@SuppressWarnings("java:S6541")
21532159
public void encodeBatch(IChunkWriter chunkWriter, BatchEncodeInfo encodeInfo, long[] times) {
2160+
int maxRowCountOfCurrentBatch =
2161+
Math.min(
2162+
rows - index,
2163+
Math.min(
2164+
(int) encodeInfo.maxNumberOfPointsInChunk - encodeInfo.pointNumInChunk, // NOSONAR
2165+
encodeInfo.maxNumberOfPointsInPage - encodeInfo.pointNumInPage));
21542166
AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) chunkWriter;
21552167

21562168
// duplicated time or deleted time are all invalid, true if we don't need this row
2157-
BitMap timeDuplicateInfo = null;
2169+
LazyBitMap timeDuplicateInfo = null;
21582170

21592171
int startIndex = index;
21602172
// time column
@@ -2183,7 +2195,7 @@ public void encodeBatch(IChunkWriter chunkWriter, BatchEncodeInfo encodeInfo, lo
21832195
encodeInfo.pointNumInChunk++;
21842196
} else {
21852197
if (Objects.isNull(timeDuplicateInfo)) {
2186-
timeDuplicateInfo = new BitMap(rows);
2198+
timeDuplicateInfo = new LazyBitMap(index, maxRowCountOfCurrentBatch, rows - 1);
21872199
}
21882200
timeDuplicateInfo.mark(index);
21892201
}

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@
2020
package org.apache.iotdb.db.storageengine.dataregion.memtable;
2121

2222
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
23+
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2324
import org.apache.iotdb.db.exception.query.QueryProcessException;
2425
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
2526
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
2627
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
2728
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
2829
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
2930
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
31+
import org.apache.iotdb.db.utils.datastructure.BatchEncodeInfo;
3032
import org.apache.iotdb.db.utils.datastructure.MemPointIterator;
3133
import org.apache.iotdb.db.utils.datastructure.TVList;
3234

@@ -41,6 +43,7 @@
4143
import org.apache.tsfile.read.filter.operator.LongFilterOperators;
4244
import org.apache.tsfile.read.filter.operator.TimeFilterOperators;
4345
import org.apache.tsfile.read.reader.series.PaginationController;
46+
import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
4447
import org.apache.tsfile.write.schema.IMeasurementSchema;
4548
import org.apache.tsfile.write.schema.VectorMeasurementSchema;
4649
import org.junit.AfterClass;
@@ -952,4 +955,58 @@ private void testSkipTimeRange(
952955
}
953956
Assert.assertEquals(expectedTimestamps, resultTimestamps);
954957
}
958+
959+
@Test
960+
public void testEncodeBatch() {
961+
testEncodeBatch(largeSingleTvListMap, 400000);
962+
testEncodeBatch(largeOrderedMultiTvListMap, 400000);
963+
testEncodeBatch(largeMergeSortMultiTvListMap, 400000);
964+
}
965+
966+
private void testEncodeBatch(Map<TVList, Integer> tvListMap, long expectedCount) {
967+
AlignedChunkWriterImpl alignedChunkWriter = new AlignedChunkWriterImpl(getMeasurementSchema());
968+
List<Integer> columnIdxList = Arrays.asList(0, 1, 2);
969+
IMeasurementSchema measurementSchema = getMeasurementSchema();
970+
AlignedReadOnlyMemChunk chunk =
971+
new AlignedReadOnlyMemChunk(
972+
fragmentInstanceContext,
973+
columnIdxList,
974+
measurementSchema,
975+
tvListMap,
976+
Collections.emptyList(),
977+
Arrays.asList(
978+
Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
979+
chunk.sortTvLists();
980+
chunk.initChunkMetaFromTVListsWithFakeStatistics();
981+
MemPointIterator memPointIterator = chunk.createMemPointIterator(Ordering.ASC, null);
982+
BatchEncodeInfo encodeInfo =
983+
new BatchEncodeInfo(
984+
0, 0, 0, 10000, 100000, IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize());
985+
long[] times = new long[10000];
986+
long count = 0;
987+
while (memPointIterator.hasNextBatch()) {
988+
memPointIterator.encodeBatch(alignedChunkWriter, encodeInfo, times);
989+
if (encodeInfo.pointNumInPage >= encodeInfo.maxNumberOfPointsInPage) {
990+
alignedChunkWriter.write(times, encodeInfo.pointNumInPage, 0);
991+
encodeInfo.pointNumInPage = 0;
992+
}
993+
994+
if (encodeInfo.pointNumInChunk >= encodeInfo.maxNumberOfPointsInChunk) {
995+
alignedChunkWriter.sealCurrentPage();
996+
alignedChunkWriter.clearPageWriter();
997+
count += alignedChunkWriter.getTimeChunkWriter().getStatistics().getCount();
998+
alignedChunkWriter = new AlignedChunkWriterImpl(getMeasurementSchema());
999+
encodeInfo.reset();
1000+
}
1001+
}
1002+
// Handle remaining data in the final unsealed chunk
1003+
if (encodeInfo.pointNumInChunk > 0 || encodeInfo.pointNumInPage > 0) {
1004+
if (encodeInfo.pointNumInPage > 0) {
1005+
alignedChunkWriter.write(times, encodeInfo.pointNumInPage, 0);
1006+
}
1007+
alignedChunkWriter.sealCurrentPage();
1008+
count += alignedChunkWriter.getTimeChunkWriter().getStatistics().getCount();
1009+
}
1010+
Assert.assertEquals(expectedCount, count);
1011+
}
9551012
}

0 commit comments

Comments
 (0)