Skip to content

Commit 85443f8

Browse files
committed
Sort vs setOwner
1 parent dcb6235 commit 85443f8

File tree

12 files changed

+99
-48
lines changed

12 files changed

+99
-48
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -869,26 +869,33 @@ public void releaseResourceWhenAllDriversAreClosed() {
869869
*/
870870
private void releaseTVListOwnedByQuery() {
871871
for (TVList tvList : tvListSet) {
872+
long tvListRamSize = tvList.calculateRamSize();
872873
tvList.lockQueryList();
873874
Set<QueryContext> queryContextSet = tvList.getQueryContextSet();
874875
try {
875876
queryContextSet.remove(this);
876877
if (tvList.getOwnerQuery() == this) {
878+
if (tvList.getReservedMemoryBytes() != tvListRamSize) {
879+
LOGGER.warn(
880+
"Release TVList owned by query: allocate size {}, release size {}",
881+
tvList.getReservedMemoryBytes(),
882+
tvListRamSize);
883+
}
877884
if (queryContextSet.isEmpty()) {
878885
if (LOGGER.isDebugEnabled()) {
879886
LOGGER.debug(
880887
"TVList {} is released by the query, FragmentInstance Id is {}",
881888
tvList,
882889
this.getId());
883890
}
884-
memoryReservationManager.releaseMemoryCumulatively(tvList.calculateRamSize());
891+
memoryReservationManager.releaseMemoryCumulatively(tvList.getReservedMemoryBytes());
885892
tvList.clear();
886893
} else {
887894
// Transfer memory to next query. It must be exception-safe as this method is called
888895
// during FragmentInstanceExecution cleanup. Any exception during this process could
889896
// prevent proper resource cleanup and cause memory leaks.
890897
Pair<Long, Long> releasedBytes =
891-
memoryReservationManager.releaseMemoryVirtually(tvList.calculateRamSize());
898+
memoryReservationManager.releaseMemoryVirtually(tvList.getReservedMemoryBytes());
892899
FragmentInstanceContext queryContext =
893900
(FragmentInstanceContext) queryContextSet.iterator().next();
894901
queryContext

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,4 @@ public Pair<Long, Long> releaseMemoryVirtually(final long size) {
4343
@Override
4444
public void reserveMemoryVirtually(
4545
final long bytesToBeReserved, final long bytesAlreadyReserved) {}
46-
47-
@Override
48-
public long getReservedMemory() {
49-
return 0;
50-
}
5146
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
package org.apache.iotdb.db.queryengine.plan.planner.memory;
2121

22-
import org.apache.iotdb.commons.utils.TestOnly;
23-
2422
import org.apache.tsfile.utils.Pair;
2523

2624
public interface MemoryReservationManager {
@@ -74,13 +72,4 @@ public interface MemoryReservationManager {
7472
* @param bytesAlreadyReserved the amount of memory that has already been reserved
7573
*/
7674
void reserveMemoryVirtually(final long bytesToBeReserved, final long bytesAlreadyReserved);
77-
78-
/**
79-
* Get the total amount of memory currently reserved. This includes memory that has been reserved,
80-
* plus memory pending to be reserved, minus memory pending to be released.
81-
*
82-
* @return the total amount of memory in bytes that is currently reserved
83-
*/
84-
@TestOnly
85-
long getReservedMemory();
8675
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,4 @@ public void reserveMemoryVirtually(
114114
reservedBytesInTotal += bytesAlreadyReserved;
115115
reserveMemoryCumulatively(bytesToBeReserved);
116116
}
117-
118-
@Override
119-
public long getReservedMemory() {
120-
return bytesToBeReserved - bytesToBeReleased + reservedBytesInTotal;
121-
}
122117
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,4 @@ public synchronized void reserveMemoryVirtually(
6161
final long bytesToBeReserved, final long bytesAlreadyReserved) {
6262
super.reserveMemoryVirtually(bytesToBeReserved, bytesAlreadyReserved);
6363
}
64-
65-
@Override
66-
public synchronized long getReservedMemory() {
67-
return super.getReservedMemory();
68-
}
6964
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ protected Map<TVList, Integer> prepareTvListMapForQuery(
148148
// mutable tvlist
149149
TVList list = memChunk.getWorkingTVList();
150150
TVList cloneList = null;
151+
long tvListRamSize = list.calculateRamSize();
151152
list.lockQueryList();
152153
try {
153154
if (copyTimeFilter != null
@@ -188,7 +189,8 @@ protected Map<TVList, Integer> prepareTvListMapForQuery(
188189
if (firstQuery instanceof FragmentInstanceContext) {
189190
MemoryReservationManager memoryReservationManager =
190191
((FragmentInstanceContext) firstQuery).getMemoryReservationContext();
191-
memoryReservationManager.reserveMemoryCumulatively(list.calculateRamSize());
192+
memoryReservationManager.reserveMemoryCumulatively(tvListRamSize);
193+
list.setReservedMemoryBytes(tvListRamSize);
192194
}
193195
list.setOwnerQuery(firstQuery);
194196

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ protected void maybeReleaseTvList(TVList tvList) {
9999
}
100100

101101
private void tryReleaseTvList(TVList tvList) {
102+
long tvListRamSize = tvList.calculateRamSize();
102103
tvList.lockQueryList();
103104
try {
104105
if (tvList.getQueryContextSet().isEmpty()) {
@@ -110,7 +111,8 @@ private void tryReleaseTvList(TVList tvList) {
110111
if (firstQuery instanceof FragmentInstanceContext) {
111112
MemoryReservationManager memoryReservationManager =
112113
((FragmentInstanceContext) firstQuery).getMemoryReservationContext();
113-
memoryReservationManager.reserveMemoryCumulatively(tvList.calculateRamSize());
114+
memoryReservationManager.reserveMemoryCumulatively(tvListRamSize);
115+
tvList.setReservedMemoryBytes(tvListRamSize);
114116
}
115117
// update current TVList owner to first query in the list
116118
tvList.setOwnerQuery(firstQuery);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iotdb.db.storageengine.dataregion.memtable;
2121

22+
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
2223
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
2324
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
2425
import org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.MemAlignedChunkLoader;
@@ -113,6 +114,21 @@ public void sortTvLists() {
113114
int queryRowCount = entry.getValue();
114115
if (!alignedTvList.isSorted() && queryRowCount > alignedTvList.seqRowCount()) {
115116
alignedTvList.sort();
117+
long alignedTvListRamSize = alignedTvList.calculateRamSize();
118+
alignedTvList.lockQueryList();
119+
try {
120+
FragmentInstanceContext ownerQuery =
121+
(FragmentInstanceContext) alignedTvList.getOwnerQuery();
122+
if (ownerQuery != null) {
123+
long deltaBytes = alignedTvListRamSize - alignedTvList.getReservedMemoryBytes();
124+
if (deltaBytes > 0) {
125+
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
126+
alignedTvList.addReservedMemoryBytes(deltaBytes);
127+
}
128+
}
129+
} finally {
130+
alignedTvList.unlockQueryList();
131+
}
116132
}
117133
}
118134
}
@@ -339,10 +355,25 @@ public boolean isEmpty() {
339355
@Override
340356
public IPointReader getPointReader() {
341357
for (Map.Entry<TVList, Integer> entry : alignedTvListQueryMap.entrySet()) {
342-
AlignedTVList tvList = (AlignedTVList) entry.getKey();
358+
AlignedTVList alignedTvList = (AlignedTVList) entry.getKey();
343359
int queryLength = entry.getValue();
344-
if (!tvList.isSorted() && queryLength > tvList.seqRowCount()) {
345-
tvList.sort();
360+
if (!alignedTvList.isSorted() && queryLength > alignedTvList.seqRowCount()) {
361+
alignedTvList.sort();
362+
long alignedTvListRamSize = alignedTvList.calculateRamSize();
363+
alignedTvList.lockQueryList();
364+
try {
365+
FragmentInstanceContext ownerQuery =
366+
(FragmentInstanceContext) alignedTvList.getOwnerQuery();
367+
if (ownerQuery != null) {
368+
long deltaBytes = alignedTvListRamSize - alignedTvList.getReservedMemoryBytes();
369+
if (deltaBytes > 0) {
370+
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
371+
alignedTvList.addReservedMemoryBytes(deltaBytes);
372+
}
373+
}
374+
} finally {
375+
alignedTvList.unlockQueryList();
376+
}
346377
}
347378
}
348379
TsBlock tsBlock = buildTsBlock();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.commons.utils.TestOnly;
2323
import org.apache.iotdb.db.exception.query.QueryProcessException;
24+
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
2425
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
2526
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
2627
import org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.MemChunkLoader;
@@ -135,6 +136,20 @@ public void sortTvLists() {
135136
int queryRowCount = entry.getValue();
136137
if (!tvList.isSorted() && queryRowCount > tvList.seqRowCount()) {
137138
tvList.sort();
139+
long tvListRamSize = tvList.calculateRamSize();
140+
tvList.lockQueryList();
141+
try {
142+
FragmentInstanceContext ownerQuery = (FragmentInstanceContext) tvList.getOwnerQuery();
143+
if (ownerQuery != null) {
144+
long deltaBytes = tvListRamSize - tvList.getReservedMemoryBytes();
145+
if (deltaBytes > 0) {
146+
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
147+
tvList.addReservedMemoryBytes(deltaBytes);
148+
}
149+
}
150+
} finally {
151+
tvList.unlockQueryList();
152+
}
138153
}
139154
}
140155
}
@@ -273,6 +288,20 @@ public IPointReader getPointReader() {
273288
int queryLength = entry.getValue();
274289
if (!tvList.isSorted() && queryLength > tvList.seqRowCount()) {
275290
tvList.sort();
291+
long tvListRamSize = tvList.calculateRamSize();
292+
tvList.lockQueryList();
293+
try {
294+
FragmentInstanceContext ownerQuery = (FragmentInstanceContext) tvList.getOwnerQuery();
295+
if (ownerQuery != null) {
296+
long deltaBytes = tvListRamSize - tvList.getReservedMemoryBytes();
297+
if (deltaBytes > 0) {
298+
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
299+
tvList.addReservedMemoryBytes(deltaBytes);
300+
}
301+
}
302+
} finally {
303+
tvList.unlockQueryList();
304+
}
276305
}
277306
}
278307
TsBlock tsBlock = buildTsBlock();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -836,7 +836,7 @@ public TSDataType getDataType() {
836836
}
837837

838838
@Override
839-
public long calculateRamSize() {
839+
public synchronized long calculateRamSize() {
840840
return timestamps.size() * alignedTvListArrayMemCost();
841841
}
842842

0 commit comments

Comments
 (0)