Skip to content

Commit e60335c

Browse files
committed
optimize memtable region scan
1 parent 0bb9e96 commit e60335c

File tree

8 files changed

+429
-84
lines changed

8 files changed

+429
-84
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2435,7 +2435,8 @@ private List<IFileScanHandle> getFileHandleListForQuery(
24352435
} else {
24362436
tsFileResource
24372437
.getProcessor()
2438-
.queryForSeriesRegionScanWithoutLock(partialPaths, context, fileScanHandles);
2438+
.queryForSeriesRegionScanWithoutLock(
2439+
partialPaths, context, fileScanHandles, globalTimeFilter);
24392440
}
24402441
}
24412442
return fileScanHandles;
@@ -2512,7 +2513,8 @@ private List<IFileScanHandle> getFileHandleListForQuery(
25122513
} else {
25132514
tsFileResource
25142515
.getProcessor()
2515-
.queryForDeviceRegionScanWithoutLock(devicePathsToContext, context, fileScanHandles);
2516+
.queryForDeviceRegionScanWithoutLock(
2517+
devicePathsToContext, context, fileScanHandles, globalTimeFilter);
25162518
}
25172519
}
25182520
return fileScanHandles;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java

Lines changed: 55 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import java.util.Map;
7171
import java.util.Map.Entry;
7272
import java.util.Objects;
73+
import java.util.Optional;
7374
import java.util.concurrent.atomic.AtomicLong;
7475
import java.util.stream.Collectors;
7576

@@ -488,7 +489,8 @@ public void queryForSeriesRegionScan(
488489
long ttlLowerBound,
489490
Map<String, List<IChunkMetadata>> chunkMetaDataMap,
490491
Map<String, List<IChunkHandle>> memChunkHandleMap,
491-
List<Pair<ModEntry, IMemTable>> modsToMemTabled) {
492+
List<Pair<ModEntry, IMemTable>> modsToMemTabled,
493+
Filter globalTimeFilter) {
492494

493495
IDeviceID deviceID = fullPath.getDeviceId();
494496
if (fullPath instanceof NonAlignedFullPath) {
@@ -506,7 +508,12 @@ public void queryForSeriesRegionScan(
506508
fullPath.getDeviceId(), measurementId, this, modsToMemTabled, ttlLowerBound);
507509
}
508510
getMemChunkHandleFromMemTable(
509-
deviceID, measurementId, chunkMetaDataMap, memChunkHandleMap, deletionList);
511+
deviceID,
512+
measurementId,
513+
chunkMetaDataMap,
514+
memChunkHandleMap,
515+
deletionList,
516+
globalTimeFilter);
510517
} else {
511518
// check If MemTable Contains this path
512519
if (!memTableMap.containsKey(deviceID)) {
@@ -528,7 +535,8 @@ public void queryForSeriesRegionScan(
528535
((AlignedFullPath) fullPath).getSchemaList(),
529536
chunkMetaDataMap,
530537
memChunkHandleMap,
531-
deletionList);
538+
deletionList,
539+
globalTimeFilter);
532540
}
533541
}
534542

@@ -539,7 +547,8 @@ public void queryForDeviceRegionScan(
539547
long ttlLowerBound,
540548
Map<String, List<IChunkMetadata>> chunkMetadataMap,
541549
Map<String, List<IChunkHandle>> memChunkHandleMap,
542-
List<Pair<ModEntry, IMemTable>> modsToMemTabled) {
550+
List<Pair<ModEntry, IMemTable>> modsToMemTabled,
551+
Filter globalTimeFilter) {
543552

544553
Map<IDeviceID, IWritableMemChunkGroup> memTableMap = getMemTableMap();
545554

@@ -556,15 +565,17 @@ public void queryForDeviceRegionScan(
556565
chunkMetadataMap,
557566
memChunkHandleMap,
558567
ttlLowerBound,
559-
modsToMemTabled);
568+
modsToMemTabled,
569+
globalTimeFilter);
560570
} else {
561571
getMemChunkHandleFromMemTable(
562572
deviceID,
563573
(WritableMemChunkGroup) writableMemChunkGroup,
564574
chunkMetadataMap,
565575
memChunkHandleMap,
566576
ttlLowerBound,
567-
modsToMemTabled);
577+
modsToMemTabled,
578+
globalTimeFilter);
568579
}
569580
}
570581

@@ -573,32 +584,36 @@ private void getMemChunkHandleFromMemTable(
573584
String measurementId,
574585
Map<String, List<IChunkMetadata>> chunkMetadataMap,
575586
Map<String, List<IChunkHandle>> memChunkHandleMap,
576-
List<TimeRange> deletionList) {
587+
List<TimeRange> deletionList,
588+
Filter globalTimeFilter) {
577589

578590
WritableMemChunk memChunk =
579591
(WritableMemChunk) memTableMap.get(deviceID).getMemChunkMap().get(measurementId);
580592

581-
long[] timestamps = memChunk.getFilteredTimestamp(deletionList);
593+
Optional<Long> anySatisfiedTimestamp =
594+
memChunk.getAnySatisfiedTimestamp(deletionList, globalTimeFilter);
595+
if (!anySatisfiedTimestamp.isPresent()) {
596+
return;
597+
}
598+
long satisfiedTimestamp = anySatisfiedTimestamp.get();
582599

583600
chunkMetadataMap
584601
.computeIfAbsent(measurementId, k -> new ArrayList<>())
585602
.add(
586-
buildChunkMetaDataForMemoryChunk(
587-
measurementId,
588-
timestamps[0],
589-
timestamps[timestamps.length - 1],
590-
Collections.emptyList()));
603+
buildFakeChunkMetaDataForFakeMemoryChunk(
604+
measurementId, satisfiedTimestamp, satisfiedTimestamp, Collections.emptyList()));
591605
memChunkHandleMap
592606
.computeIfAbsent(measurementId, k -> new ArrayList<>())
593-
.add(new MemChunkHandleImpl(deviceID, measurementId, timestamps));
607+
.add(new MemChunkHandleImpl(deviceID, measurementId, new long[] {satisfiedTimestamp}));
594608
}
595609

596610
private void getMemAlignedChunkHandleFromMemTable(
597611
IDeviceID deviceID,
598612
List<IMeasurementSchema> schemaList,
599613
Map<String, List<IChunkMetadata>> chunkMetadataList,
600614
Map<String, List<IChunkHandle>> memChunkHandleMap,
601-
List<List<TimeRange>> deletionList) {
615+
List<List<TimeRange>> deletionList,
616+
Filter globalTimeFilter) {
602617

603618
AlignedWritableMemChunk alignedMemChunk =
604619
((AlignedWritableMemChunkGroup) memTableMap.get(deviceID)).getAlignedMemChunk();
@@ -615,7 +630,11 @@ private void getMemAlignedChunkHandleFromMemTable(
615630
}
616631

617632
List<BitMap> bitMaps = new ArrayList<>();
618-
long[] timestamps = alignedMemChunk.getFilteredTimestamp(deletionList, bitMaps, true);
633+
long[] timestamps =
634+
alignedMemChunk.getAnySatisfiedTimestamp(deletionList, bitMaps, true, globalTimeFilter);
635+
if (timestamps.length == 0) {
636+
return;
637+
}
619638

620639
buildAlignedMemChunkHandle(
621640
deviceID,
@@ -633,7 +652,8 @@ private void getMemAlignedChunkHandleFromMemTable(
633652
Map<String, List<IChunkMetadata>> chunkMetadataList,
634653
Map<String, List<IChunkHandle>> memChunkHandleMap,
635654
long ttlLowerBound,
636-
List<Pair<ModEntry, IMemTable>> modsToMemTabled) {
655+
List<Pair<ModEntry, IMemTable>> modsToMemTabled,
656+
Filter globalTimeFilter) {
637657

638658
AlignedWritableMemChunk memChunk = writableMemChunkGroup.getAlignedMemChunk();
639659
List<IMeasurementSchema> schemaList = memChunk.getSchemaList();
@@ -648,7 +668,11 @@ private void getMemAlignedChunkHandleFromMemTable(
648668
}
649669

650670
List<BitMap> bitMaps = new ArrayList<>();
651-
long[] timestamps = memChunk.getFilteredTimestamp(deletionList, bitMaps, true);
671+
long[] timestamps =
672+
memChunk.getAnySatisfiedTimestamp(deletionList, bitMaps, true, globalTimeFilter);
673+
if (timestamps.length == 0) {
674+
return;
675+
}
652676
buildAlignedMemChunkHandle(
653677
deviceID,
654678
timestamps,
@@ -665,7 +689,8 @@ private void getMemChunkHandleFromMemTable(
665689
Map<String, List<IChunkMetadata>> chunkMetadataMap,
666690
Map<String, List<IChunkHandle>> memChunkHandleMap,
667691
long ttlLowerBound,
668-
List<Pair<ModEntry, IMemTable>> modsToMemTabled) {
692+
List<Pair<ModEntry, IMemTable>> modsToMemTabled,
693+
Filter globalTimeFilter) {
669694

670695
for (Entry<String, IWritableMemChunk> entry :
671696
writableMemChunkGroup.getMemChunkMap().entrySet()) {
@@ -679,18 +704,20 @@ private void getMemChunkHandleFromMemTable(
679704
ModificationUtils.constructDeletionList(
680705
deviceID, measurementId, this, modsToMemTabled, ttlLowerBound);
681706
}
682-
long[] timestamps = writableMemChunk.getFilteredTimestamp(deletionList);
707+
Optional<Long> anySatisfiedTimestamp =
708+
writableMemChunk.getAnySatisfiedTimestamp(deletionList, globalTimeFilter);
709+
if (!anySatisfiedTimestamp.isPresent()) {
710+
return;
711+
}
712+
long satisfiedTimestamp = anySatisfiedTimestamp.get();
683713
chunkMetadataMap
684714
.computeIfAbsent(measurementId, k -> new ArrayList<>())
685715
.add(
686-
buildChunkMetaDataForMemoryChunk(
687-
measurementId,
688-
timestamps[0],
689-
timestamps[timestamps.length - 1],
690-
Collections.emptyList()));
716+
buildFakeChunkMetaDataForFakeMemoryChunk(
717+
measurementId, satisfiedTimestamp, satisfiedTimestamp, Collections.emptyList()));
691718
memChunkHandleMap
692719
.computeIfAbsent(measurementId, k -> new ArrayList<>())
693-
.add(new MemChunkHandleImpl(deviceID, measurementId, timestamps));
720+
.add(new MemChunkHandleImpl(deviceID, measurementId, new long[] {satisfiedTimestamp}));
694721
}
695722
}
696723

@@ -714,7 +741,7 @@ private void buildAlignedMemChunkHandle(
714741
chunkMetadataList
715742
.computeIfAbsent(measurement, k -> new ArrayList<>())
716743
.add(
717-
buildChunkMetaDataForMemoryChunk(
744+
buildFakeChunkMetaDataForFakeMemoryChunk(
718745
measurement, startEndTime[0], startEndTime[1], deletion));
719746
chunkHandleMap
720747
.computeIfAbsent(measurement, k -> new ArrayList<>())
@@ -745,7 +772,7 @@ private long[] calculateStartEndTime(long[] timestamps, List<BitMap> bitMaps, in
745772
return new long[] {startTime, endTime};
746773
}
747774

748-
private IChunkMetadata buildChunkMetaDataForMemoryChunk(
775+
private IChunkMetadata buildFakeChunkMetaDataForFakeMemoryChunk(
749776
String measurement, long startTime, long endTime, List<TimeRange> deletionList) {
750777
TimeStatistics timeStatistics = new TimeStatistics();
751778
timeStatistics.setStartTime(startTime);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java

Lines changed: 78 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.tsfile.encrypt.EncryptUtils;
3535
import org.apache.tsfile.enums.TSDataType;
3636
import org.apache.tsfile.read.common.TimeRange;
37+
import org.apache.tsfile.read.filter.basic.Filter;
3738
import org.apache.tsfile.utils.Binary;
3839
import org.apache.tsfile.utils.BitMap;
3940
import org.apache.tsfile.utils.Pair;
@@ -55,6 +56,7 @@
5556
import java.util.Set;
5657
import java.util.TreeMap;
5758
import java.util.concurrent.BlockingQueue;
59+
import java.util.concurrent.atomic.AtomicInteger;
5860

5961
import static org.apache.iotdb.db.utils.ModificationUtils.isPointDeleted;
6062

@@ -287,13 +289,63 @@ private Pair<Object[], BitMap[]> checkAndReorderColumnValuesInInsertPlan(
287289
return new Pair<>(reorderedColumnValues, reorderedBitMaps);
288290
}
289291

290-
private void filterDeletedTimeStamp(
292+
public long[] getAnySatisfiedTimestamp(
293+
List<List<TimeRange>> deletionList,
294+
List<BitMap> bitMaps,
295+
boolean ignoreAllNullRows,
296+
Filter globalTimeFilter) {
297+
BitMap columnHasNonNullValue = new BitMap(schemaList.size());
298+
AtomicInteger hasNonNullValueColumnCount = new AtomicInteger(0);
299+
Map<Long, BitMap> timestampWithBitmap = new TreeMap<>();
300+
301+
getAnySatisfiedTimestamp(
302+
list,
303+
deletionList,
304+
ignoreAllNullRows,
305+
timestampWithBitmap,
306+
globalTimeFilter,
307+
columnHasNonNullValue,
308+
hasNonNullValueColumnCount);
309+
for (int i = 0;
310+
i < sortedList.size() && hasNonNullValueColumnCount.get() < schemaList.size();
311+
i++) {
312+
if (!ignoreAllNullRows && !timestampWithBitmap.isEmpty()) {
313+
// count devices in table model
314+
break;
315+
}
316+
getAnySatisfiedTimestamp(
317+
sortedList.get(i),
318+
deletionList,
319+
ignoreAllNullRows,
320+
timestampWithBitmap,
321+
globalTimeFilter,
322+
columnHasNonNullValue,
323+
hasNonNullValueColumnCount);
324+
}
325+
326+
long[] timestamps = new long[timestampWithBitmap.size()];
327+
int idx = 0;
328+
for (Map.Entry<Long, BitMap> entry : timestampWithBitmap.entrySet()) {
329+
timestamps[idx++] = entry.getKey();
330+
bitMaps.add(entry.getValue());
331+
}
332+
return timestamps;
333+
}
334+
335+
private void getAnySatisfiedTimestamp(
291336
AlignedTVList alignedTVList,
292337
List<List<TimeRange>> valueColumnsDeletionList,
293338
boolean ignoreAllNullRows,
294-
Map<Long, BitMap> timestampWithBitmap) {
339+
Map<Long, BitMap> timestampWithBitmap,
340+
Filter globalTimeFilter,
341+
BitMap columnHasNonNullValue,
342+
AtomicInteger hasNonNullValueColumnCount) {
343+
if (globalTimeFilter != null
344+
&& !globalTimeFilter.satisfyStartEndTime(
345+
alignedTVList.getMinTime(), alignedTVList.getMaxTime())) {
346+
return;
347+
}
295348
BitMap allValueColDeletedMap = alignedTVList.getAllValueColDeletedMap();
296-
297349
int rowCount = alignedTVList.rowCount();
298350
List<int[]> valueColumnDeleteCursor = new ArrayList<>();
299351
if (valueColumnsDeletionList != null) {
@@ -306,11 +358,17 @@ private void filterDeletedTimeStamp(
306358
continue;
307359
}
308360
long timestamp = alignedTVList.getTime(row);
361+
if (globalTimeFilter != null && !globalTimeFilter.satisfy(timestamp, null)) {
362+
continue;
363+
}
309364

310365
BitMap bitMap = new BitMap(schemaList.size());
366+
367+
boolean foundAnyNewColumnWithNonNullValue = false;
311368
for (int column = 0; column < schemaList.size(); column++) {
312369
if (alignedTVList.isNullValue(alignedTVList.getValueIndex(row), column)) {
313370
bitMap.mark(column);
371+
continue;
314372
}
315373

316374
// skip deleted row
@@ -321,32 +379,30 @@ && isPointDeleted(
321379
valueColumnsDeletionList.get(column),
322380
valueColumnDeleteCursor.get(column))) {
323381
bitMap.mark(column);
382+
continue;
324383
}
325384

326-
// skip all-null row
327-
if (ignoreAllNullRows && bitMap.isAllMarked()) {
328-
continue;
385+
if (!columnHasNonNullValue.isMarked(column)) {
386+
hasNonNullValueColumnCount.incrementAndGet();
387+
foundAnyNewColumnWithNonNullValue = true;
388+
columnHasNonNullValue.mark(column);
329389
}
330-
timestampWithBitmap.put(timestamp, bitMap);
331390
}
332-
}
333-
}
334-
335-
public long[] getFilteredTimestamp(
336-
List<List<TimeRange>> deletionList, List<BitMap> bitMaps, boolean ignoreAllNullRows) {
337-
Map<Long, BitMap> timestampWithBitmap = new TreeMap<>();
338391

339-
filterDeletedTimeStamp(list, deletionList, ignoreAllNullRows, timestampWithBitmap);
340-
for (AlignedTVList alignedTVList : sortedList) {
341-
filterDeletedTimeStamp(alignedTVList, deletionList, ignoreAllNullRows, timestampWithBitmap);
342-
}
392+
if (!ignoreAllNullRows) {
393+
// count devices in table model
394+
timestampWithBitmap.put(timestamp, bitMap);
395+
return;
396+
}
397+
if (!foundAnyNewColumnWithNonNullValue) {
398+
continue;
399+
}
400+
timestampWithBitmap.put(timestamp, bitMap);
343401

344-
List<Long> filteredTimestamps = new ArrayList<>();
345-
for (Map.Entry<Long, BitMap> entry : timestampWithBitmap.entrySet()) {
346-
filteredTimestamps.add(entry.getKey());
347-
bitMaps.add(entry.getValue());
402+
if (hasNonNullValueColumnCount.get() == schemaList.size()) {
403+
return;
404+
}
348405
}
349-
return filteredTimestamps.stream().mapToLong(Long::valueOf).toArray();
350406
}
351407

352408
@Override

0 commit comments

Comments
 (0)