Skip to content

Commit 7b74174

Browse files
committed
Sort vs setOwner
1 parent 2e59f9d commit 7b74174

File tree

11 files changed

+92
-45
lines changed

11 files changed

+92
-45
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -902,21 +902,28 @@ private void releaseTVListOwnedByQuery() {
902902
try {
903903
queryContextSet.remove(this);
904904
if (tvList.getOwnerQuery() == this) {
905+
long tvListRamSize = tvList.calculateRamSize();
906+
if (tvList.getReservedMemoryBytes() != tvListRamSize) {
907+
LOGGER.warn(
908+
"Release TVList owned by query: allocate size {}, release size {}",
909+
tvList.getReservedMemoryBytes(),
910+
tvListRamSize);
911+
}
905912
if (queryContextSet.isEmpty()) {
906913
if (LOGGER.isDebugEnabled()) {
907914
LOGGER.debug(
908915
"TVList {} is released by the query, FragmentInstance Id is {}",
909916
tvList,
910917
this.getId());
911918
}
912-
memoryReservationManager.releaseMemoryCumulatively(tvList.calculateRamSize());
919+
memoryReservationManager.releaseMemoryCumulatively(tvList.getReservedMemoryBytes());
913920
tvList.clear();
914921
} else {
915922
// Transfer memory to next query. It must be exception-safe as this method is called
916923
// during FragmentInstanceExecution cleanup. Any exception during this process could
917924
// prevent proper resource cleanup and cause memory leaks.
918925
Pair<Long, Long> releasedBytes =
919-
memoryReservationManager.releaseMemoryVirtually(tvList.calculateRamSize());
926+
memoryReservationManager.releaseMemoryVirtually(tvList.getReservedMemoryBytes());
920927
FragmentInstanceContext queryContext =
921928
(FragmentInstanceContext) queryContextSet.iterator().next();
922929
queryContext

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,4 @@ public Pair<Long, Long> releaseMemoryVirtually(final long size) {
4343
@Override
4444
public void reserveMemoryVirtually(
4545
final long bytesToBeReserved, final long bytesAlreadyReserved) {}
46-
47-
@Override
48-
public long getReservedMemory() {
49-
return 0;
50-
}
5146
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
package org.apache.iotdb.db.queryengine.plan.planner.memory;
2121

22-
import org.apache.iotdb.commons.utils.TestOnly;
23-
2422
import org.apache.tsfile.utils.Pair;
2523

2624
public interface MemoryReservationManager {
@@ -74,13 +72,4 @@ public interface MemoryReservationManager {
7472
* @param bytesAlreadyReserved the amount of memory that has already been reserved
7573
*/
7674
void reserveMemoryVirtually(final long bytesToBeReserved, final long bytesAlreadyReserved);
77-
78-
/**
79-
* Get the total amount of memory currently reserved. This includes memory that has been reserved,
80-
* plus memory pending to be reserved, minus memory pending to be released.
81-
*
82-
* @return the total amount of memory in bytes that is currently reserved
83-
*/
84-
@TestOnly
85-
long getReservedMemory();
8675
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,4 @@ public void reserveMemoryVirtually(
114114
reservedBytesInTotal += bytesAlreadyReserved;
115115
reserveMemoryCumulatively(bytesToBeReserved);
116116
}
117-
118-
@Override
119-
public long getReservedMemory() {
120-
return bytesToBeReserved - bytesToBeReleased + reservedBytesInTotal;
121-
}
122117
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,4 @@ public synchronized void reserveMemoryVirtually(
6161
final long bytesToBeReserved, final long bytesAlreadyReserved) {
6262
super.reserveMemoryVirtually(bytesToBeReserved, bytesAlreadyReserved);
6363
}
64-
65-
@Override
66-
public synchronized long getReservedMemory() {
67-
return super.getReservedMemory();
68-
}
6964
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,9 @@ protected Map<TVList, Integer> prepareTvListMapForQuery(
193193
if (firstQuery instanceof FragmentInstanceContext) {
194194
MemoryReservationManager memoryReservationManager =
195195
((FragmentInstanceContext) firstQuery).getMemoryReservationContext();
196-
memoryReservationManager.reserveMemoryCumulatively(list.calculateRamSize());
196+
long tvListRamSize = list.calculateRamSize();
197+
memoryReservationManager.reserveMemoryCumulatively(tvListRamSize);
198+
list.setReservedMemoryBytes(tvListRamSize);
197199
}
198200
list.setOwnerQuery(firstQuery);
199201

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,9 @@ private void tryReleaseTvList(TVList tvList) {
112112
if (firstQuery instanceof FragmentInstanceContext) {
113113
MemoryReservationManager memoryReservationManager =
114114
((FragmentInstanceContext) firstQuery).getMemoryReservationContext();
115-
memoryReservationManager.reserveMemoryCumulatively(tvList.calculateRamSize());
115+
long tvListRamSize = tvList.calculateRamSize();
116+
memoryReservationManager.reserveMemoryCumulatively(tvListRamSize);
117+
tvList.setReservedMemoryBytes(tvListRamSize);
116118
}
117119
// update current TVList owner to first query in the list
118120
tvList.setOwnerQuery(firstQuery);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iotdb.db.storageengine.dataregion.memtable;
2121

22+
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
2223
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
2324
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
2425
import org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.MemAlignedChunkLoader;
@@ -118,6 +119,21 @@ public void sortTvLists() {
118119
int queryRowCount = entry.getValue();
119120
if (!alignedTvList.isSorted() && queryRowCount > alignedTvList.seqRowCount()) {
120121
alignedTvList.sort();
122+
alignedTvList.lockQueryList();
123+
try {
124+
FragmentInstanceContext ownerQuery =
125+
(FragmentInstanceContext) alignedTvList.getOwnerQuery();
126+
if (ownerQuery != null) {
127+
long deltaBytes =
128+
alignedTvList.calculateRamSize() - alignedTvList.getReservedMemoryBytes();
129+
if (deltaBytes > 0) {
130+
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
131+
alignedTvList.addReservedMemoryBytes(deltaBytes);
132+
}
133+
}
134+
} finally {
135+
alignedTvList.unlockQueryList();
136+
}
121137
}
122138
}
123139
}
@@ -356,10 +372,25 @@ public boolean isEmpty() {
356372
@Override
357373
public IPointReader getPointReader() {
358374
for (Map.Entry<TVList, Integer> entry : alignedTvListQueryMap.entrySet()) {
359-
AlignedTVList tvList = (AlignedTVList) entry.getKey();
375+
AlignedTVList alignedTvList = (AlignedTVList) entry.getKey();
360376
int queryLength = entry.getValue();
361-
if (!tvList.isSorted() && queryLength > tvList.seqRowCount()) {
362-
tvList.sort();
377+
if (!alignedTvList.isSorted() && queryLength > alignedTvList.seqRowCount()) {
378+
alignedTvList.sort();
379+
alignedTvList.lockQueryList();
380+
try {
381+
FragmentInstanceContext ownerQuery =
382+
(FragmentInstanceContext) alignedTvList.getOwnerQuery();
383+
if (ownerQuery != null) {
384+
long deltaBytes =
385+
alignedTvList.calculateRamSize() - alignedTvList.getReservedMemoryBytes();
386+
if (deltaBytes > 0) {
387+
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
388+
alignedTvList.addReservedMemoryBytes(deltaBytes);
389+
}
390+
}
391+
} finally {
392+
alignedTvList.unlockQueryList();
393+
}
363394
}
364395
}
365396
TsBlock tsBlock = buildTsBlock();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.commons.utils.TestOnly;
2323
import org.apache.iotdb.db.exception.query.QueryProcessException;
24+
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
2425
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
2526
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
2627
import org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.MemChunkLoader;
@@ -135,6 +136,19 @@ public void sortTvLists() {
135136
int queryRowCount = entry.getValue();
136137
if (!tvList.isSorted() && queryRowCount > tvList.seqRowCount()) {
137138
tvList.sort();
139+
tvList.lockQueryList();
140+
try {
141+
FragmentInstanceContext ownerQuery = (FragmentInstanceContext) tvList.getOwnerQuery();
142+
if (ownerQuery != null) {
143+
long deltaBytes = tvList.calculateRamSize() - tvList.getReservedMemoryBytes();
144+
if (deltaBytes > 0) {
145+
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
146+
tvList.addReservedMemoryBytes(deltaBytes);
147+
}
148+
}
149+
} finally {
150+
tvList.unlockQueryList();
151+
}
138152
}
139153
}
140154
}
@@ -274,6 +288,19 @@ public IPointReader getPointReader() {
274288
int queryLength = entry.getValue();
275289
if (!tvList.isSorted() && queryLength > tvList.seqRowCount()) {
276290
tvList.sort();
291+
tvList.lockQueryList();
292+
try {
293+
FragmentInstanceContext ownerQuery = (FragmentInstanceContext) tvList.getOwnerQuery();
294+
if (ownerQuery != null) {
295+
long deltaBytes = tvList.calculateRamSize() - tvList.getReservedMemoryBytes();
296+
if (deltaBytes > 0) {
297+
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
298+
tvList.addReservedMemoryBytes(deltaBytes);
299+
}
300+
}
301+
} finally {
302+
tvList.unlockQueryList();
303+
}
277304
}
278305
}
279306
TsBlock tsBlock = buildTsBlock();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@
2121

2222
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2323
import org.apache.iotdb.commons.utils.TestOnly;
24-
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
2524
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
26-
import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
2725
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
2826
import org.apache.iotdb.db.service.metrics.WritingMetrics;
2927
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
@@ -88,6 +86,9 @@ public abstract class TVList implements WALEntryValue {
8886
// When it is null, the TVList is owned by insert thread and released after flush.
8987
protected QueryContext ownerQuery;
9088

89+
// Reserved memory by the query. Ensure to acquire queryListLock before update.
90+
protected long reservedMemoryBytes = 0L;
91+
9192
protected boolean sorted = true;
9293
protected long maxTime;
9394
protected long minTime;
@@ -167,6 +168,18 @@ public synchronized boolean isSorted() {
167168
return sorted;
168169
}
169170

171+
public void setReservedMemoryBytes(long bytes) {
172+
this.reservedMemoryBytes = bytes;
173+
}
174+
175+
public void addReservedMemoryBytes(long bytes) {
176+
this.reservedMemoryBytes += bytes;
177+
}
178+
179+
public long getReservedMemoryBytes() {
180+
return reservedMemoryBytes;
181+
}
182+
170183
public abstract void sort();
171184

172185
public void increaseReferenceCount() {
@@ -327,13 +340,6 @@ protected void set(int index, long timestamp, int valueIndex) {
327340
int offset = i * ARRAY_SIZE;
328341
Arrays.setAll(indices.get(i), j -> offset + j);
329342
}
330-
// Reserve memory for indices if the TVList is owned by a query
331-
if (ownerQuery != null) {
332-
long indicesBytes = indices.size() * PrimitiveArrayManager.ARRAY_SIZE * 4L;
333-
MemoryReservationManager memoryReservationManager =
334-
((FragmentInstanceContext) ownerQuery).getMemoryReservationContext();
335-
memoryReservationManager.reserveMemoryCumulatively(indicesBytes);
336-
}
337343
}
338344
indices.get(arrayIndex)[elementIndex] = valueIndex;
339345
}

0 commit comments

Comments
 (0)