Skip to content

Commit db77455

Browse files
authored
[to dev/1.3] Fix slow query threshold & fix a bug for lastQuery & optimize encodeBatch for multi tvlist scene (#16766)
1 parent a719e49 commit db77455

File tree

4 files changed

+73
-5
lines changed

4 files changed

+73
-5
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -872,7 +872,7 @@ public class IoTDBConfig {
872872
private int thriftDefaultBufferSize = RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY;
873873

874874
/** time cost(ms) threshold for slow query. Unit: millisecond */
875-
private long slowQueryThreshold = 30000;
875+
private long slowQueryThreshold = 10000;
876876

877877
private int patternMatchingThreshold = 1000000;
878878

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@
156156
import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
157157
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache;
158158
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
159+
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DeviceLastCache;
159160
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
160161
import org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory;
161162
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
@@ -2956,7 +2957,6 @@ private AlignedSeriesAggregationScanOperator createLastQueryScanOperator(
29562957

29572958
@Override
29582959
public Operator visitLastQueryScan(LastQueryScanNode node, LocalExecutionPlanContext context) {
2959-
DATA_NODE_SCHEMA_CACHE.cleanUp();
29602960
final PartialPath devicePath = node.getDevicePath();
29612961
List<Integer> idxOfMeasurementSchemas = node.getIdxOfMeasurementSchemas();
29622962
List<Integer> unCachedMeasurementIndexes = new ArrayList<>();
@@ -2984,7 +2984,7 @@ public Operator visitLastQueryScan(LastQueryScanNode node, LocalExecutionPlanCon
29842984

29852985
if (timeValuePair == null) { // last value is not cached
29862986
unCachedMeasurementIndexes.add(i);
2987-
} else if (timeValuePair.getValue() == null) {
2987+
} else if (timeValuePair.getValue() == DeviceLastCache.EMPTY_PRIMITIVE_TYPE) {
29882988
// there is no data for this time series, just ignore
29892989
} else if (!LastQueryUtil.satisfyFilter(filter, timeValuePair)) {
29902990
// cached last value is not satisfied

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,7 @@ private Object getAlignedValueForQuery(
262262
return getAlignedValueByValueIndex(valueIndex, null, floatPrecision, encodingList);
263263
}
264264

265+
@SuppressWarnings("java:S6541")
265266
private TsPrimitiveType getAlignedValueByValueIndex(
266267
int valueIndex,
267268
int[] validIndexesForTimeDuplicatedRows,
@@ -920,6 +921,7 @@ public static long valueListArrayMemCost(TSDataType type) {
920921
}
921922

922923
/** Build TsBlock by column. */
924+
@SuppressWarnings("java:S6541")
923925
public TsBlock buildTsBlock(
924926
int floatPrecision, List<TSEncoding> encodingList, List<List<TimeRange>> deletionList) {
925927
TsBlockBuilder builder = new TsBlockBuilder(dataTypes);
@@ -1155,6 +1157,7 @@ public void serializeToWAL(IWALByteBufferView buffer) {
11551157
}
11561158
}
11571159

1160+
@SuppressWarnings("java:S6541")
11581161
public static AlignedTVList deserialize(DataInputStream stream) throws IOException {
11591162
TSDataType dataType = ReadWriteIOUtils.readDataType(stream);
11601163
if (dataType != TSDataType.VECTOR) {
@@ -1429,6 +1432,7 @@ public AlignedTVListIterator(
14291432
}
14301433

14311434
@Override
1435+
@SuppressWarnings("java:S6541")
14321436
protected void prepareNext() {
14331437
// find the first row that is neither deleted nor empty (all NULL values)
14341438
findValidRow = false;
@@ -1602,6 +1606,7 @@ public TsPrimitiveType getPrimitiveTypeObject(int rowIndex, int columnIndex) {
16021606
}
16031607

16041608
@Override
1609+
@SuppressWarnings("java:S6541")
16051610
public boolean hasNextBatch() {
16061611
if (!paginationController.hasCurLimit()) {
16071612
return false;
@@ -1884,11 +1889,18 @@ private boolean needRebuildTsBlock(boolean[] hasAnyNonNullValue) {
18841889
}
18851890

18861891
@Override
1892+
@SuppressWarnings("java:S6541")
18871893
public void encodeBatch(IChunkWriter chunkWriter, BatchEncodeInfo encodeInfo, long[] times) {
1894+
int maxRowCountOfCurrentBatch =
1895+
Math.min(
1896+
rows - index,
1897+
Math.min(
1898+
(int) encodeInfo.maxNumberOfPointsInChunk - encodeInfo.pointNumInChunk, // NOSONAR
1899+
encodeInfo.maxNumberOfPointsInPage - encodeInfo.pointNumInPage));
18881900
AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) chunkWriter;
18891901

18901902
// duplicated time or deleted time are all invalid, true if we don't need this row
1891-
BitMap timeDuplicateInfo = null;
1903+
LazyBitMap timeDuplicateInfo = null;
18921904

18931905
int startIndex = index;
18941906
// time column
@@ -1914,7 +1926,7 @@ public void encodeBatch(IChunkWriter chunkWriter, BatchEncodeInfo encodeInfo, lo
19141926
encodeInfo.pointNumInChunk++;
19151927
} else {
19161928
if (Objects.isNull(timeDuplicateInfo)) {
1917-
timeDuplicateInfo = new BitMap(rows);
1929+
timeDuplicateInfo = new LazyBitMap(index, maxRowCountOfCurrentBatch, rows - 1);
19181930
}
19191931
timeDuplicateInfo.mark(index);
19201932
}

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedTVListIteratorTest.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@
2020
package org.apache.iotdb.db.storageengine.dataregion.memtable;
2121

2222
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
23+
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2324
import org.apache.iotdb.db.exception.query.QueryProcessException;
2425
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
2526
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
2627
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
2728
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
2829
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
2930
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
31+
import org.apache.iotdb.db.utils.datastructure.BatchEncodeInfo;
3032
import org.apache.iotdb.db.utils.datastructure.MemPointIterator;
3133
import org.apache.iotdb.db.utils.datastructure.TVList;
3234

@@ -41,6 +43,7 @@
4143
import org.apache.tsfile.read.filter.operator.LongFilterOperators;
4244
import org.apache.tsfile.read.filter.operator.TimeFilterOperators;
4345
import org.apache.tsfile.read.reader.series.PaginationController;
46+
import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
4447
import org.apache.tsfile.write.schema.IMeasurementSchema;
4548
import org.apache.tsfile.write.schema.VectorMeasurementSchema;
4649
import org.junit.AfterClass;
@@ -895,4 +898,57 @@ private void testSkipTimeRange(
895898
}
896899
Assert.assertEquals(expectedTimestamps, resultTimestamps);
897900
}
901+
902+
@Test
903+
public void testEncodeBatch() {
904+
testEncodeBatch(largeSingleTvListMap, 400000);
905+
testEncodeBatch(largeOrderedMultiTvListMap, 400000);
906+
testEncodeBatch(largeMergeSortMultiTvListMap, 400000);
907+
}
908+
909+
private void testEncodeBatch(Map<TVList, Integer> tvListMap, long expectedCount) {
910+
AlignedChunkWriterImpl alignedChunkWriter = new AlignedChunkWriterImpl(getMeasurementSchema());
911+
List<Integer> columnIdxList = Arrays.asList(0, 1, 2);
912+
IMeasurementSchema measurementSchema = getMeasurementSchema();
913+
AlignedReadOnlyMemChunk chunk =
914+
new AlignedReadOnlyMemChunk(
915+
fragmentInstanceContext,
916+
columnIdxList,
917+
measurementSchema,
918+
tvListMap,
919+
Arrays.asList(
920+
Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
921+
chunk.sortTvLists();
922+
chunk.initChunkMetaFromTVListsWithFakeStatistics();
923+
MemPointIterator memPointIterator = chunk.createMemPointIterator(Ordering.ASC, null);
924+
BatchEncodeInfo encodeInfo =
925+
new BatchEncodeInfo(
926+
0, 0, 0, 10000, 100000, IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize());
927+
long[] times = new long[10000];
928+
long count = 0;
929+
while (memPointIterator.hasNextBatch()) {
930+
memPointIterator.encodeBatch(alignedChunkWriter, encodeInfo, times);
931+
if (encodeInfo.pointNumInPage >= encodeInfo.maxNumberOfPointsInPage) {
932+
alignedChunkWriter.write(times, encodeInfo.pointNumInPage, 0);
933+
encodeInfo.pointNumInPage = 0;
934+
}
935+
936+
if (encodeInfo.pointNumInChunk >= encodeInfo.maxNumberOfPointsInChunk) {
937+
alignedChunkWriter.sealCurrentPage();
938+
count += alignedChunkWriter.getTimeChunkWriter().getPointNum();
939+
alignedChunkWriter.clearPageWriter();
940+
alignedChunkWriter = new AlignedChunkWriterImpl(getMeasurementSchema());
941+
encodeInfo.reset();
942+
}
943+
}
944+
// Handle remaining data in the final unsealed chunk
945+
if (encodeInfo.pointNumInChunk > 0 || encodeInfo.pointNumInPage > 0) {
946+
if (encodeInfo.pointNumInPage > 0) {
947+
alignedChunkWriter.write(times, encodeInfo.pointNumInPage, 0);
948+
}
949+
alignedChunkWriter.sealCurrentPage();
950+
count += alignedChunkWriter.getTimeChunkWriter().getPointNum();
951+
}
952+
Assert.assertEquals(expectedCount, count);
953+
}
898954
}

0 commit comments

Comments
 (0)