Skip to content

Commit 9be174b

Browse files
authored
[to dev/1.3] Optimize memtable region scan #16891
1 parent abb25f4 commit 9be174b

File tree

7 files changed

+494
-86
lines changed

7 files changed

+494
-86
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2161,7 +2161,8 @@ private List<IFileScanHandle> getFileHandleListForQuery(
21612161
} else {
21622162
tsFileResource
21632163
.getProcessor()
2164-
.queryForSeriesRegionScanWithoutLock(partialPaths, context, fileScanHandles);
2164+
.queryForSeriesRegionScanWithoutLock(
2165+
partialPaths, context, fileScanHandles, globalTimeFilter);
21652166
}
21662167
}
21672168
return fileScanHandles;
@@ -2238,7 +2239,8 @@ private List<IFileScanHandle> getFileHandleListForQuery(
22382239
} else {
22392240
tsFileResource
22402241
.getProcessor()
2241-
.queryForDeviceRegionScanWithoutLock(devicePathsToContext, context, fileScanHandles);
2242+
.queryForDeviceRegionScanWithoutLock(
2243+
devicePathsToContext, context, fileScanHandles, globalTimeFilter);
22422244
}
22432245
}
22442246
return fileScanHandles;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java

Lines changed: 57 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import java.util.Map;
6868
import java.util.Map.Entry;
6969
import java.util.Objects;
70+
import java.util.Optional;
7071
import java.util.concurrent.atomic.AtomicLong;
7172
import java.util.stream.Collectors;
7273

@@ -451,7 +452,8 @@ public void queryForSeriesRegionScan(
451452
long ttlLowerBound,
452453
Map<String, List<IChunkMetadata>> chunkMetaDataMap,
453454
Map<String, List<IChunkHandle>> memChunkHandleMap,
454-
List<Pair<Modification, IMemTable>> modsToMemTabled) {
455+
List<Pair<Modification, IMemTable>> modsToMemTabled,
456+
Filter globalTimeFilter) {
455457

456458
IDeviceID deviceID = DeviceIDFactory.getInstance().getDeviceID(fullPath.getDevicePath());
457459

@@ -469,7 +471,12 @@ public void queryForSeriesRegionScan(
469471
(MeasurementPath) fullPath, this, modsToMemTabled, ttlLowerBound);
470472
}
471473
getMemChunkHandleFromMemTable(
472-
deviceID, measurementId, chunkMetaDataMap, memChunkHandleMap, deletionList);
474+
deviceID,
475+
measurementId,
476+
chunkMetaDataMap,
477+
memChunkHandleMap,
478+
deletionList,
479+
globalTimeFilter);
473480
} else {
474481
if (!memTableMap.containsKey(deviceID)) {
475482
return;
@@ -486,7 +493,8 @@ public void queryForSeriesRegionScan(
486493
((AlignedPath) fullPath).getSchemaList(),
487494
chunkMetaDataMap,
488495
memChunkHandleMap,
489-
deletionList);
496+
deletionList,
497+
globalTimeFilter);
490498
}
491499
}
492500

@@ -497,7 +505,8 @@ public void queryForDeviceRegionScan(
497505
long ttlLowerBound,
498506
Map<String, List<IChunkMetadata>> chunkMetadataMap,
499507
Map<String, List<IChunkHandle>> memChunkHandleMap,
500-
List<Pair<Modification, IMemTable>> modsToMemTabled)
508+
List<Pair<Modification, IMemTable>> modsToMemTabled,
509+
Filter globalTimeFilter)
501510
throws IllegalPathException {
502511

503512
Map<IDeviceID, IWritableMemChunkGroup> memTableMap = getMemTableMap();
@@ -515,15 +524,17 @@ public void queryForDeviceRegionScan(
515524
chunkMetadataMap,
516525
memChunkHandleMap,
517526
ttlLowerBound,
518-
modsToMemTabled);
527+
modsToMemTabled,
528+
globalTimeFilter);
519529
} else {
520530
getMemChunkHandleFromMemTable(
521531
deviceID,
522532
(WritableMemChunkGroup) writableMemChunkGroup,
523533
chunkMetadataMap,
524534
memChunkHandleMap,
525535
ttlLowerBound,
526-
modsToMemTabled);
536+
modsToMemTabled,
537+
globalTimeFilter);
527538
}
528539
}
529540

@@ -532,32 +543,39 @@ private void getMemChunkHandleFromMemTable(
532543
String measurementId,
533544
Map<String, List<IChunkMetadata>> chunkMetadataMap,
534545
Map<String, List<IChunkHandle>> memChunkHandleMap,
535-
List<TimeRange> deletionList) {
546+
List<TimeRange> deletionList,
547+
Filter globalTimeFilter) {
536548

537549
WritableMemChunk memChunk =
538550
(WritableMemChunk) memTableMap.get(deviceID).getMemChunkMap().get(measurementId);
539551

540-
long[] timestamps = memChunk.getFilteredTimestamp(deletionList);
552+
if (memChunk == null) {
553+
return;
554+
}
555+
Optional<Long> anySatisfiedTimestamp =
556+
memChunk.getAnySatisfiedTimestamp(deletionList, globalTimeFilter);
557+
if (!anySatisfiedTimestamp.isPresent()) {
558+
return;
559+
}
560+
long satisfiedTimestamp = anySatisfiedTimestamp.get();
541561

542562
chunkMetadataMap
543563
.computeIfAbsent(measurementId, k -> new ArrayList<>())
544564
.add(
545-
buildChunkMetaDataForMemoryChunk(
546-
measurementId,
547-
timestamps[0],
548-
timestamps[timestamps.length - 1],
549-
Collections.emptyList()));
565+
buildFakeChunkMetaDataForFakeMemoryChunk(
566+
measurementId, satisfiedTimestamp, satisfiedTimestamp, Collections.emptyList()));
550567
memChunkHandleMap
551568
.computeIfAbsent(measurementId, k -> new ArrayList<>())
552-
.add(new MemChunkHandleImpl(deviceID, measurementId, timestamps));
569+
.add(new MemChunkHandleImpl(deviceID, measurementId, new long[] {satisfiedTimestamp}));
553570
}
554571

555572
private void getMemAlignedChunkHandleFromMemTable(
556573
IDeviceID deviceID,
557574
List<IMeasurementSchema> schemaList,
558575
Map<String, List<IChunkMetadata>> chunkMetadataList,
559576
Map<String, List<IChunkHandle>> memChunkHandleMap,
560-
List<List<TimeRange>> deletionList) {
577+
List<List<TimeRange>> deletionList,
578+
Filter globalTimeFilter) {
561579

562580
AlignedWritableMemChunk alignedMemChunk =
563581
((AlignedWritableMemChunkGroup) memTableMap.get(deviceID)).getAlignedMemChunk();
@@ -574,7 +592,11 @@ private void getMemAlignedChunkHandleFromMemTable(
574592
}
575593

576594
List<BitMap> bitMaps = new ArrayList<>();
577-
long[] timestamps = alignedMemChunk.getFilteredTimestamp(deletionList, bitMaps);
595+
long[] timestamps =
596+
alignedMemChunk.getAnySatisfiedTimestamp(deletionList, bitMaps, globalTimeFilter);
597+
if (timestamps.length == 0) {
598+
return;
599+
}
578600

579601
buildAlignedMemChunkHandle(
580602
deviceID,
@@ -592,7 +614,8 @@ private void getMemAlignedChunkHandleFromMemTable(
592614
Map<String, List<IChunkMetadata>> chunkMetadataList,
593615
Map<String, List<IChunkHandle>> memChunkHandleMap,
594616
long ttlLowerBound,
595-
List<Pair<Modification, IMemTable>> modsToMemTabled)
617+
List<Pair<Modification, IMemTable>> modsToMemTabled,
618+
Filter globalTimeFilter)
596619
throws IllegalPathException {
597620

598621
AlignedWritableMemChunk memChunk = writableMemChunkGroup.getAlignedMemChunk();
@@ -611,7 +634,10 @@ private void getMemAlignedChunkHandleFromMemTable(
611634
}
612635

613636
List<BitMap> bitMaps = new ArrayList<>();
614-
long[] timestamps = memChunk.getFilteredTimestamp(deletionList, bitMaps);
637+
long[] timestamps = memChunk.getAnySatisfiedTimestamp(deletionList, bitMaps, globalTimeFilter);
638+
if (timestamps.length == 0) {
639+
return;
640+
}
615641
buildAlignedMemChunkHandle(
616642
deviceID,
617643
timestamps,
@@ -628,7 +654,8 @@ private void getMemChunkHandleFromMemTable(
628654
Map<String, List<IChunkMetadata>> chunkMetadataMap,
629655
Map<String, List<IChunkHandle>> memChunkHandleMap,
630656
long ttlLowerBound,
631-
List<Pair<Modification, IMemTable>> modsToMemTabled)
657+
List<Pair<Modification, IMemTable>> modsToMemTabled,
658+
Filter globalTimeFilter)
632659
throws IllegalPathException {
633660

634661
for (Entry<String, IWritableMemChunk> entry :
@@ -646,18 +673,20 @@ private void getMemChunkHandleFromMemTable(
646673
modsToMemTabled,
647674
ttlLowerBound);
648675
}
649-
long[] timestamps = writableMemChunk.getFilteredTimestamp(deletionList);
676+
Optional<Long> anySatisfiedTimestamp =
677+
writableMemChunk.getAnySatisfiedTimestamp(deletionList, globalTimeFilter);
678+
if (!anySatisfiedTimestamp.isPresent()) {
679+
return;
680+
}
681+
long satisfiedTimestamp = anySatisfiedTimestamp.get();
650682
chunkMetadataMap
651683
.computeIfAbsent(measurementId, k -> new ArrayList<>())
652684
.add(
653-
buildChunkMetaDataForMemoryChunk(
654-
measurementId,
655-
timestamps[0],
656-
timestamps[timestamps.length - 1],
657-
Collections.emptyList()));
685+
buildFakeChunkMetaDataForFakeMemoryChunk(
686+
measurementId, satisfiedTimestamp, satisfiedTimestamp, Collections.emptyList()));
658687
memChunkHandleMap
659688
.computeIfAbsent(measurementId, k -> new ArrayList<>())
660-
.add(new MemChunkHandleImpl(deviceID, measurementId, timestamps));
689+
.add(new MemChunkHandleImpl(deviceID, measurementId, new long[] {satisfiedTimestamp}));
661690
}
662691
}
663692

@@ -681,7 +710,7 @@ private void buildAlignedMemChunkHandle(
681710
chunkMetadataList
682711
.computeIfAbsent(measurement, k -> new ArrayList<>())
683712
.add(
684-
buildChunkMetaDataForMemoryChunk(
713+
buildFakeChunkMetaDataForFakeMemoryChunk(
685714
measurement, startEndTime[0], startEndTime[1], deletion));
686715
chunkHandleMap
687716
.computeIfAbsent(measurement, k -> new ArrayList<>())
@@ -712,7 +741,7 @@ private long[] calculateStartEndTime(long[] timestamps, List<BitMap> bitMaps, in
712741
return new long[] {startTime, endTime};
713742
}
714743

715-
private IChunkMetadata buildChunkMetaDataForMemoryChunk(
744+
private IChunkMetadata buildFakeChunkMetaDataForFakeMemoryChunk(
716745
String measurement, long startTime, long endTime, List<TimeRange> deletionList) {
717746
TimeStatistics timeStatistics = new TimeStatistics();
718747
timeStatistics.setStartTime(startTime);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java

Lines changed: 90 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
import org.apache.tsfile.enums.TSDataType;
3333
import org.apache.tsfile.read.common.TimeRange;
34+
import org.apache.tsfile.read.filter.basic.Filter;
3435
import org.apache.tsfile.utils.Binary;
3536
import org.apache.tsfile.utils.BitMap;
3637
import org.apache.tsfile.utils.Pair;
@@ -52,6 +53,7 @@
5253
import java.util.Set;
5354
import java.util.TreeMap;
5455
import java.util.concurrent.BlockingQueue;
56+
import java.util.concurrent.atomic.AtomicInteger;
5557

5658
import static org.apache.iotdb.db.utils.ModificationUtils.isPointDeleted;
5759

@@ -712,29 +714,89 @@ private Pair<Object[], BitMap[]> checkAndReorderColumnValuesInInsertPlan(
712714
return new Pair<>(reorderedColumnValues, reorderedBitMaps);
713715
}
714716

715-
private void filterDeletedTimeStamp(
717+
public long[] getAnySatisfiedTimestamp(
718+
List<List<TimeRange>> deletionList, List<BitMap> bitMaps, Filter globalTimeFilter) {
719+
BitMap columnHasNonNullValue = new BitMap(schemaList.size());
720+
AtomicInteger hasNonNullValueColumnCount = new AtomicInteger(0);
721+
Map<Long, BitMap> timestampWithBitmap = new TreeMap<>();
722+
723+
getAnySatisfiedTimestamp(
724+
list,
725+
deletionList,
726+
timestampWithBitmap,
727+
globalTimeFilter,
728+
columnHasNonNullValue,
729+
hasNonNullValueColumnCount);
730+
for (int i = 0;
731+
i < sortedList.size() && hasNonNullValueColumnCount.get() < schemaList.size();
732+
i++) {
733+
getAnySatisfiedTimestamp(
734+
sortedList.get(i),
735+
deletionList,
736+
timestampWithBitmap,
737+
globalTimeFilter,
738+
columnHasNonNullValue,
739+
hasNonNullValueColumnCount);
740+
}
741+
742+
long[] timestamps = new long[timestampWithBitmap.size()];
743+
int idx = 0;
744+
for (Map.Entry<Long, BitMap> entry : timestampWithBitmap.entrySet()) {
745+
timestamps[idx++] = entry.getKey();
746+
bitMaps.add(entry.getValue());
747+
}
748+
return timestamps;
749+
}
750+
751+
private void getAnySatisfiedTimestamp(
716752
AlignedTVList alignedTVList,
717753
List<List<TimeRange>> valueColumnsDeletionList,
718-
Map<Long, BitMap> timestampWithBitmap) {
754+
Map<Long, BitMap> timestampWithBitmap,
755+
Filter globalTimeFilter,
756+
BitMap columnHasNonNullValue,
757+
AtomicInteger hasNonNullValueColumnCount) {
758+
if (globalTimeFilter != null
759+
&& !globalTimeFilter.satisfyStartEndTime(
760+
alignedTVList.getMinTime(), alignedTVList.getMaxTime())) {
761+
return;
762+
}
719763
BitMap allValueColDeletedMap = alignedTVList.getAllValueColDeletedMap();
720-
721764
int rowCount = alignedTVList.rowCount();
722765
List<int[]> valueColumnDeleteCursor = new ArrayList<>();
723766
if (valueColumnsDeletionList != null) {
724767
valueColumnsDeletionList.forEach(x -> valueColumnDeleteCursor.add(new int[] {0}));
725768
}
726769

770+
// example:
771+
// globalTimeFilter:null, ignoreAllNullRows: true
772+
// tvList:
773+
// time s1 s2 s3
774+
// 1 1 null null
775+
// 2 null 1 null
776+
// 2 1 1 null
777+
// 3 1 null null
778+
// 4 1 null 1
779+
// timestampWithBitmap:
780+
// timestamp: 1 bitmap: 011
781+
// timestamp: 2 bitmap: 101
782+
// timestamp: 4 bitmap: 110
727783
for (int row = 0; row < rowCount; row++) {
728784
// the row is deleted
729785
if (allValueColDeletedMap != null && allValueColDeletedMap.isMarked(row)) {
730786
continue;
731787
}
732788
long timestamp = alignedTVList.getTime(row);
789+
if (globalTimeFilter != null && !globalTimeFilter.satisfy(timestamp, null)) {
790+
continue;
791+
}
792+
793+
// Note that this method will only perform bitmap unmarking on the first occurrence of a
794+
// non-null value in multiple timestamps for the same column.
795+
BitMap currentRowNullValueBitmap = null;
733796

734-
BitMap bitMap = new BitMap(schemaList.size());
735797
for (int column = 0; column < schemaList.size(); column++) {
736798
if (alignedTVList.isNullValue(alignedTVList.getValueIndex(row), column)) {
737-
bitMap.mark(column);
799+
continue;
738800
}
739801

740802
// skip deleted row
@@ -744,32 +806,36 @@ && isPointDeleted(
744806
timestamp,
745807
valueColumnsDeletionList.get(column),
746808
valueColumnDeleteCursor.get(column))) {
747-
bitMap.mark(column);
748-
}
749-
750-
// skip all-null row
751-
if (bitMap.isAllMarked()) {
752809
continue;
753810
}
754-
timestampWithBitmap.put(timestamp, bitMap);
811+
if (!columnHasNonNullValue.isMarked(column)) {
812+
hasNonNullValueColumnCount.incrementAndGet();
813+
columnHasNonNullValue.mark(column);
814+
currentRowNullValueBitmap =
815+
currentRowNullValueBitmap != null
816+
? currentRowNullValueBitmap
817+
: timestampWithBitmap.computeIfAbsent(
818+
timestamp, k -> getAllMarkedBitmap(schemaList.size()));
819+
currentRowNullValueBitmap.unmark(column);
820+
}
755821
}
756-
}
757-
}
758822

759-
public long[] getFilteredTimestamp(List<List<TimeRange>> deletionList, List<BitMap> bitMaps) {
760-
Map<Long, BitMap> timestampWithBitmap = new TreeMap<>();
823+
if (currentRowNullValueBitmap == null) {
824+
continue;
825+
}
826+
// found new column with non-null value
827+
timestampWithBitmap.put(timestamp, currentRowNullValueBitmap);
761828

762-
filterDeletedTimeStamp(list, deletionList, timestampWithBitmap);
763-
for (AlignedTVList alignedTVList : sortedList) {
764-
filterDeletedTimeStamp(alignedTVList, deletionList, timestampWithBitmap);
829+
if (hasNonNullValueColumnCount.get() == schemaList.size()) {
830+
return;
831+
}
765832
}
833+
}
766834

767-
List<Long> filteredTimestamps = new ArrayList<>();
768-
for (Map.Entry<Long, BitMap> entry : timestampWithBitmap.entrySet()) {
769-
filteredTimestamps.add(entry.getKey());
770-
bitMaps.add(entry.getValue());
771-
}
772-
return filteredTimestamps.stream().mapToLong(Long::valueOf).toArray();
835+
private BitMap getAllMarkedBitmap(int size) {
836+
BitMap bitMap = new BitMap(size);
837+
bitMap.markAll();
838+
return bitMap;
773839
}
774840

775841
// Choose maximum avgPointSizeOfLargestColumn among working and sorted AlignedTVList as

0 commit comments

Comments
 (0)