Skip to content

Commit 2ae06e8

Browse files
authored
Optimize memtable scan
1 parent fe96a55 commit 2ae06e8

File tree

13 files changed

+806
-52
lines changed

13 files changed

+806
-52
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStatisticsResp;
5858

5959
import org.apache.tsfile.file.metadata.IDeviceID;
60+
import org.apache.tsfile.read.common.TimeRange;
6061
import org.apache.tsfile.read.filter.basic.Filter;
6162
import org.apache.tsfile.read.filter.factory.FilterFactory;
6263
import org.apache.tsfile.utils.RamUsageEstimator;
@@ -66,6 +67,7 @@
6667
import java.time.ZoneId;
6768
import java.time.format.DateTimeParseException;
6869
import java.util.ArrayList;
70+
import java.util.Collections;
6971
import java.util.HashSet;
7072
import java.util.List;
7173
import java.util.Map;
@@ -99,6 +101,7 @@ public class FragmentInstanceContext extends QueryContext {
99101

100102
protected IDataRegionForQuery dataRegion;
101103
private Filter globalTimeFilter;
104+
private List<TimeRange> globalTimeFilterTimeRanges;
102105

103106
// it will only be used once, after sharedQueryDataSource being inited, it will be set to null
104107
private List<IFullPath> sourcePaths;
@@ -517,6 +520,23 @@ public Filter getGlobalTimeFilter() {
517520
return globalTimeFilter;
518521
}
519522

523+
public List<TimeRange> getGlobalTimeFilterTimeRanges() {
524+
if (globalTimeFilter == null) {
525+
return Collections.singletonList(new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE));
526+
}
527+
List<TimeRange> local = globalTimeFilterTimeRanges;
528+
if (local == null) {
529+
synchronized (this) {
530+
local = globalTimeFilterTimeRanges;
531+
if (local == null) {
532+
local = globalTimeFilter.getTimeRanges();
533+
globalTimeFilterTimeRanges = local;
534+
}
535+
}
536+
}
537+
return local;
538+
}
539+
520540
public void setTimeFilterForTableModel(Filter timeFilter) {
521541
if (globalTimeFilter == null) {
522542
globalTimeFilter = timeFilter;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.MemPageReader;
3636
import org.apache.iotdb.db.storageengine.dataregion.read.reader.common.DescPriorityMergeReader;
3737
import org.apache.iotdb.db.storageengine.dataregion.read.reader.common.MergeReaderPriority;
38+
import org.apache.iotdb.db.storageengine.dataregion.read.reader.common.NoDataPointReader;
3839
import org.apache.iotdb.db.storageengine.dataregion.read.reader.common.PriorityMergeReader;
3940
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
4041
import org.apache.iotdb.db.utils.datastructure.MemPointIterator;
@@ -134,7 +135,10 @@ public class SeriesScanUtil implements Accountable {
134135
+ RamUsageEstimator.shallowSizeOfInstance(IDeviceID.class)
135136
+ RamUsageEstimator.shallowSizeOfInstance(TimeOrderUtils.class)
136137
+ RamUsageEstimator.shallowSizeOfInstance(PaginationController.class)
137-
+ RamUsageEstimator.shallowSizeOfInstance(SeriesScanOptions.class);
138+
+ RamUsageEstimator.shallowSizeOfInstance(SeriesScanOptions.class)
139+
+ RamUsageEstimator.shallowSizeOfInstance(TimeRange.class);
140+
141+
protected TimeRange satisfiedTimeRange;
138142

139143
public SeriesScanUtil(
140144
IFullPath seriesPath,
@@ -214,6 +218,20 @@ public void initQueryDataSource(QueryDataSource dataSource) {
214218
// init file index
215219
orderUtils.setCurSeqFileIndex(dataSource);
216220
curUnseqFileIndex = 0;
221+
222+
if (satisfiedTimeRange == null) {
223+
long startTime = Long.MAX_VALUE;
224+
long endTime = Long.MIN_VALUE;
225+
if (scanOptions.getGlobalTimeFilter() == null) {
226+
satisfiedTimeRange = new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE);
227+
return;
228+
}
229+
for (TimeRange timeRange : context.getGlobalTimeFilterTimeRanges()) {
230+
startTime = Math.min(startTime, timeRange.getMin());
231+
endTime = Math.max(endTime, timeRange.getMax());
232+
}
233+
satisfiedTimeRange = new TimeRange(startTime, endTime);
234+
}
217235
}
218236

219237
protected PriorityMergeReader getPriorityMergeReader() {
@@ -681,6 +699,13 @@ private void unpackOneFakeMemChunkMetaData(
681699
readOnlyMemChunk.createMemPointIterator(
682700
orderUtils.getScanOrder(), scanOptions.getGlobalTimeFilter());
683701
for (Statistics<? extends Serializable> statistics : statisticsList) {
702+
long orderTime = orderUtils.getOrderTime(statistics);
703+
boolean canSkip =
704+
(orderUtils.getAscending() && orderTime > satisfiedTimeRange.getMax())
705+
|| (!orderUtils.getAscending() && orderTime < satisfiedTimeRange.getMin());
706+
if (canSkip) {
707+
break;
708+
}
684709
IVersionPageReader versionPageReader =
685710
new LazyMemVersionPageReader(
686711
context,
@@ -1458,6 +1483,7 @@ protected static class LazyMemVersionPageReader implements IVersionPageReader {
14581483
protected final boolean isSeq;
14591484
protected final boolean isAligned;
14601485
private boolean inited = false;
1486+
private boolean hasData = true;
14611487

14621488
LazyMemVersionPageReader(
14631489
QueryContext context,
@@ -1477,11 +1503,14 @@ protected static class LazyMemVersionPageReader implements IVersionPageReader {
14771503
}
14781504

14791505
public IPointReader getPointReader() {
1506+
if (!hasData) {
1507+
return NoDataPointReader.getInstance();
1508+
}
14801509
return memPointIterator;
14811510
}
14821511

14831512
public boolean hasNextBatch() {
1484-
return memPointIterator.hasNextBatch();
1513+
return hasData && memPointIterator.hasNextBatch();
14851514
}
14861515

14871516
public void setCurrentPageTimeRangeToMemPointIterator() {
@@ -1490,10 +1519,34 @@ public void setCurrentPageTimeRangeToMemPointIterator() {
14901519
}
14911520
if (statistics.getStartTime() > statistics.getEndTime()) {
14921521
// empty
1522+
hasData = false;
14931523
return;
14941524
}
1495-
this.memPointIterator.setCurrentPageTimeRange(
1496-
new TimeRange(statistics.getStartTime(), statistics.getEndTime()));
1525+
Filter globalTimeFilter = ((FragmentInstanceContext) context).getGlobalTimeFilter();
1526+
if (globalTimeFilter == null) {
1527+
this.memPointIterator.setCurrentPageTimeRange(
1528+
new TimeRange(statistics.getStartTime(), statistics.getEndTime()));
1529+
return;
1530+
}
1531+
1532+
long startTime = statistics.getStartTime();
1533+
long endTime = statistics.getEndTime();
1534+
long minStart = Long.MAX_VALUE;
1535+
long maxEnd = Long.MIN_VALUE;
1536+
for (TimeRange timeRange :
1537+
((FragmentInstanceContext) context).getGlobalTimeFilterTimeRanges()) {
1538+
if (timeRange.overlaps(new TimeRange(startTime, endTime))) {
1539+
minStart = Math.min(minStart, Math.max(timeRange.getMin(), startTime));
1540+
maxEnd = Math.max(maxEnd, Math.min(timeRange.getMax(), endTime));
1541+
}
1542+
}
1543+
1544+
if (minStart > maxEnd) {
1545+
hasData = false;
1546+
return;
1547+
}
1548+
1549+
this.memPointIterator.setCurrentPageTimeRange(new TimeRange(minStart, maxEnd));
14971550
}
14981551

14991552
public TsBlock nextBatch() {
@@ -1629,7 +1682,7 @@ public long getOrderTime(Statistics statistics) {
16291682
@SuppressWarnings("squid:S3740")
16301683
@Override
16311684
public long getOverlapCheckTime(Statistics range) {
1632-
return range.getStartTime();
1685+
return Math.max(satisfiedTimeRange.getMin(), range.getStartTime());
16331686
}
16341687

16351688
@SuppressWarnings("squid:S3740")
@@ -1758,7 +1811,7 @@ public long getOrderTime(Statistics statistics) {
17581811
@SuppressWarnings("squid:S3740")
17591812
@Override
17601813
public long getOverlapCheckTime(Statistics range) {
1761-
return range.getEndTime();
1814+
return Math.min(satisfiedTimeRange.getMax(), range.getEndTime());
17621815
}
17631816

17641817
@SuppressWarnings("squid:S3740")
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.storageengine.dataregion.read.reader.common;
21+
22+
import org.apache.tsfile.read.TimeValuePair;
23+
import org.apache.tsfile.read.reader.IPointReader;
24+
25+
import java.io.IOException;
26+
27+
public class NoDataPointReader implements IPointReader {
28+
29+
private NoDataPointReader() {}
30+
31+
private static final IPointReader instance = new NoDataPointReader();
32+
33+
public static IPointReader getInstance() {
34+
return instance;
35+
}
36+
37+
@Override
38+
public boolean hasNextTimeValuePair() throws IOException {
39+
return false;
40+
}
41+
42+
@Override
43+
public TimeValuePair nextTimeValuePair() throws IOException {
44+
return null;
45+
}
46+
47+
@Override
48+
public TimeValuePair currentTimeValuePair() throws IOException {
49+
return null;
50+
}
51+
52+
@Override
53+
public long getUsedMemorySize() {
54+
return 0;
55+
}
56+
57+
@Override
58+
public void close() throws IOException {}
59+
}

0 commit comments

Comments
 (0)