Skip to content

Commit 3238ec2

Browse files
authored
Add the used memory calculation for DescPriorityMergeReader which is missed before (#14548)
1 parent 266c3b0 commit 3238ec2

File tree

7 files changed

+177
-212
lines changed

7 files changed

+177
-212
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/CachedPriorityMergeReader.java

Lines changed: 0 additions & 97 deletions
This file was deleted.

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/DescPriorityMergeReader.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,13 @@
1919

2020
package org.apache.iotdb.db.storageengine.dataregion.read.reader.common;
2121

22-
import org.apache.tsfile.read.reader.IPointReader;
23-
24-
import java.io.IOException;
2522
import java.util.PriorityQueue;
2623

2724
public class DescPriorityMergeReader extends PriorityMergeReader {
2825

2926
public DescPriorityMergeReader() {
30-
super.heap =
27+
currentReadStopTime = Long.MAX_VALUE;
28+
heap =
3129
new PriorityQueue<>(
3230
(o1, o2) -> {
3331
int timeCompare =
@@ -37,13 +35,7 @@ public DescPriorityMergeReader() {
3735
}
3836

3937
@Override
40-
public void addReader(IPointReader reader, MergeReaderPriority priority, long endTime)
41-
throws IOException {
42-
if (reader.hasNextTimeValuePair()) {
43-
heap.add(new Element(reader, reader.nextTimeValuePair(), priority));
44-
super.currentReadStopTime = Math.min(currentReadStopTime, endTime);
45-
} else {
46-
reader.close();
47-
}
38+
protected void updateCurrentReadStopTime(long endTime) {
39+
currentReadStopTime = Math.min(currentReadStopTime, endTime);
4840
}
4941
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.apache.iotdb.db.storageengine.dataregion.read.reader.common;
2121

22-
import org.apache.iotdb.commons.utils.TestOnly;
2322
import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
2423

2524
import org.apache.tsfile.read.TimeValuePair;
@@ -34,7 +33,7 @@ public class PriorityMergeReader implements IPointReader {
3433

3534
// max time of all added readers in PriorityMergeReader
3635
// or min time of all added readers in DescPriorityMergeReader
37-
protected long currentReadStopTime;
36+
protected long currentReadStopTime = Long.MIN_VALUE;
3837

3938
protected PriorityQueue<Element> heap;
4039

@@ -57,25 +56,12 @@ public void setMemoryReservationManager(MemoryReservationManager memoryReservati
5756
this.memoryReservationManager = memoryReservationManager;
5857
}
5958

60-
@TestOnly
61-
public void addReader(IPointReader reader, long priority) throws IOException {
62-
if (reader.hasNextTimeValuePair()) {
63-
heap.add(
64-
new Element(
65-
reader,
66-
reader.nextTimeValuePair(),
67-
new MergeReaderPriority(Long.MAX_VALUE, priority, 0, false)));
68-
} else {
69-
reader.close();
70-
}
71-
}
72-
7359
public void addReader(IPointReader reader, MergeReaderPriority priority, long endTime)
7460
throws IOException {
7561
if (reader.hasNextTimeValuePair()) {
7662
Element element = new Element(reader, reader.nextTimeValuePair(), priority);
7763
heap.add(element);
78-
currentReadStopTime = Math.max(currentReadStopTime, endTime);
64+
updateCurrentReadStopTime(endTime);
7965
long size = element.getReader().getUsedMemorySize();
8066
usedMemorySize += size;
8167
if (memoryReservationManager != null) {
@@ -86,6 +72,10 @@ public void addReader(IPointReader reader, MergeReaderPriority priority, long en
8672
}
8773
}
8874

75+
protected void updateCurrentReadStopTime(long endTime) {
76+
currentReadStopTime = Math.max(currentReadStopTime, endTime);
77+
}
78+
8979
public long getCurrentReadStopTime() {
9080
return currentReadStopTime;
9181
}

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/FakedSeriesReader.java renamed to iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/AscFakedSeriesReader.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,31 +25,31 @@
2525

2626
import java.io.IOException;
2727

28-
public class FakedSeriesReader implements IPointReader {
28+
public class AscFakedSeriesReader implements IPointReader {
2929

30-
private int index;
31-
private int size;
32-
private boolean initWithTimeList;
33-
private static final TSDataType DATA_TYPE = TSDataType.INT64;
30+
protected int index;
31+
protected int size;
32+
protected boolean initWithTimeList;
33+
protected static final TSDataType DATA_TYPE = TSDataType.INT64;
3434

3535
// init with time list and value
36-
private long[] timestamps;
37-
private long value;
36+
protected long[] timestamps;
37+
protected long value;
3838

3939
// init with startTime, size, interval and modValue
40-
private long startTime;
41-
private int interval;
42-
private int modValue;
40+
protected long startTime;
41+
protected int interval;
42+
protected int modValue;
4343

44-
public FakedSeriesReader(long[] timestamps, long value) {
44+
public AscFakedSeriesReader(long[] timestamps, long value) {
4545
this.initWithTimeList = true;
4646
this.index = 0;
4747
this.size = timestamps.length;
4848
this.timestamps = timestamps;
4949
this.value = value;
5050
}
5151

52-
public FakedSeriesReader(long startTime, int size, int interval, int modValue) {
52+
public AscFakedSeriesReader(long startTime, int size, int interval, int modValue) {
5353
this.initWithTimeList = false;
5454
this.index = 0;
5555
this.size = size;
@@ -82,8 +82,8 @@ public TimeValuePair currentTimeValuePair() throws IOException {
8282

8383
@Override
8484
public long getUsedMemorySize() {
85-
// not used
86-
return 0;
85+
// use size of timestamps to mock the used memory
86+
return size;
8787
}
8888

8989
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iotdb.db.storageengine.dataregion.read.reader.common;
20+
21+
import org.apache.tsfile.enums.TSDataType;
22+
import org.apache.tsfile.read.TimeValuePair;
23+
import org.apache.tsfile.utils.TsPrimitiveType;
24+
25+
public class DescFakedSeriesReader extends AscFakedSeriesReader {
26+
27+
public DescFakedSeriesReader(long[] timestamps, long value) {
28+
super(timestamps, value);
29+
index = size - 1;
30+
}
31+
32+
public DescFakedSeriesReader(long startTime, int size, int interval, int modValue) {
33+
super(startTime, size, interval, modValue);
34+
index = size - 1;
35+
startTime = startTime + interval * size;
36+
}
37+
38+
@Override
39+
public boolean hasNextTimeValuePair() {
40+
return index >= 0;
41+
}
42+
43+
@Override
44+
public TimeValuePair nextTimeValuePair() {
45+
if (initWithTimeList) {
46+
return new TimeValuePair(timestamps[index--], TsPrimitiveType.getByType(DATA_TYPE, value));
47+
} else {
48+
long time = startTime;
49+
startTime -= interval;
50+
index--;
51+
return new TimeValuePair(time, TsPrimitiveType.getByType(TSDataType.INT64, time % modValue));
52+
}
53+
}
54+
}

0 commit comments

Comments
 (0)