Skip to content

Commit 6c44769

Browse files
committed
fix: memory to be released is larger than the memory of memory block in TVList owner transfer case
1 parent 9abac5c commit 6c44769

File tree

6 files changed

+98
-8
lines changed

6 files changed

+98
-8
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.apache.tsfile.read.common.TimeRange;
6161
import org.apache.tsfile.read.filter.basic.Filter;
6262
import org.apache.tsfile.read.filter.factory.FilterFactory;
63+
import org.apache.tsfile.utils.Pair;
6364
import org.apache.tsfile.utils.RamUsageEstimator;
6465
import org.slf4j.Logger;
6566
import org.slf4j.LoggerFactory;
@@ -902,19 +903,32 @@ private void releaseTVListOwnedByQuery() {
902903
queryContextSet.remove(this);
903904
if (tvList.getOwnerQuery() == this) {
904905
if (queryContextSet.isEmpty()) {
905-
LOGGER.debug(
906-
"TVList {} is released by the query, FragmentInstance Id is {}",
907-
tvList,
908-
this.getId());
906+
if (LOGGER.isDebugEnabled()) {
907+
LOGGER.debug(
908+
"TVList {} is released by the query, FragmentInstance Id is {}",
909+
tvList,
910+
this.getId());
911+
}
909912
memoryReservationManager.releaseMemoryCumulatively(tvList.calculateRamSize());
910913
tvList.clear();
911914
} else {
915+
// Transfer memory to next query. It must be exception-safe as this method is called
916+
// during FragmentInstanceExecution cleanup. Any exception during this process could
917+
// prevent proper resource cleanup and cause memory leaks.
918+
Pair<Long, Long> releasedBytes =
919+
memoryReservationManager.releaseMemoryVirtually(tvList.calculateRamSize());
912920
FragmentInstanceContext queryContext =
913921
(FragmentInstanceContext) queryContextSet.iterator().next();
914-
LOGGER.debug(
915-
"TVList {} is now owned by another query, FragmentInstance Id is {}",
916-
tvList,
917-
queryContext.getId());
922+
queryContext
923+
.getMemoryReservationContext()
924+
.reserveMemoryVirtually(releasedBytes.left, releasedBytes.right);
925+
926+
if (LOGGER.isDebugEnabled()) {
927+
LOGGER.debug(
928+
"TVList {} is now owned by another query, FragmentInstance Id is {}",
929+
tvList,
930+
queryContext.getId());
931+
}
918932
tvList.setOwnerQuery(queryContext);
919933
}
920934
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,10 @@ public void reserveFromFreeMemoryForOperators(
294294
final long reservedBytes,
295295
final String queryId,
296296
final String contextHolder) {
297+
if (memoryInBytes <= 0) {
298+
throw new IllegalArgumentException(
299+
"Bytes to reserve from free memory for operators should be larger than 0");
300+
}
297301
if (OPERATORS_MEMORY_BLOCK.allocate(memoryInBytes)) {
298302
if (LOGGER.isDebugEnabled()) {
299303
LOGGER.debug(
@@ -317,6 +321,10 @@ public void reserveFromFreeMemoryForOperators(
317321
}
318322

319323
public void releaseToFreeMemoryForOperators(final long memoryInBytes) {
324+
if (memoryInBytes <= 0) {
325+
throw new IllegalArgumentException(
326+
"Bytes to release to free memory for operators should be larger than 0");
327+
}
320328
OPERATORS_MEMORY_BLOCK.release(memoryInBytes);
321329
}
322330

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.apache.iotdb.db.queryengine.plan.planner.memory;
2121

22+
import org.apache.tsfile.utils.Pair;
23+
2224
public class FakedMemoryReservationManager implements MemoryReservationManager {
2325

2426
@Override
@@ -32,4 +34,13 @@ public void releaseMemoryCumulatively(long size) {}
3234

3335
@Override
3436
public void releaseAllReservedMemory() {}
37+
38+
@Override
39+
public Pair<Long, Long> releaseMemoryVirtually(final long size) {
40+
return null;
41+
}
42+
43+
@Override
44+
public void reserveMemoryVirtually(
45+
final long releasedBytesInReserved, final long releasedBytesInTotal) {}
3546
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.apache.iotdb.db.queryengine.plan.planner.memory;
2121

22+
import org.apache.tsfile.utils.Pair;
23+
2224
public interface MemoryReservationManager {
2325
/**
2426
* Reserve memory for the given size. The memory reservation request will be accumulated and the
@@ -43,4 +45,23 @@ public interface MemoryReservationManager {
4345
* this manager ends, Or the memory to be released in the batch may not be released correctly.
4446
*/
4547
void releaseAllReservedMemory();
48+
49+
/**
50+
* Release memory virtually without actually freeing the memory. This is used for memory
51+
* reservation transfer scenarios where memory ownership needs to be transferred between different
52+
* FragmentInstances without actual memory deallocation.
53+
*
54+
* <p>NOTE: When calling this method, it should be guaranteed that bytesToBeReserved +
55+
* reservedBytesInTotal >= size to ensure proper memory accounting and prevent negative
56+
* reservation values.
57+
*/
58+
Pair<Long, Long> releaseMemoryVirtually(final long size);
59+
60+
/**
61+
* Reserve memory virtually without actually allocating new memory. This is used to transfer
62+
* memory ownership from one FragmentInstances to another by reserving the memory that was
63+
* previously released virtually. It updates the internal reservation state without changing the
64+
* actual memory allocation.
65+
*/
66+
void reserveMemoryVirtually(final long releasedBytesInReserved, final long releasedBytesInTotal);
4667
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.iotdb.db.queryengine.common.QueryId;
2323
import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
2424

25+
import org.apache.tsfile.utils.Pair;
26+
2527
import javax.annotation.concurrent.NotThreadSafe;
2628

2729
@NotThreadSafe
@@ -91,4 +93,25 @@ public void releaseAllReservedMemory() {
9193
bytesToBeReleased = 0;
9294
}
9395
}
96+
97+
@Override
98+
public Pair<Long, Long> releaseMemoryVirtually(final long size) {
99+
if (bytesToBeReserved >= size) {
100+
bytesToBeReserved -= size;
101+
return new Pair<>(size, 0L);
102+
} else {
103+
long releasedBytesInReserved = bytesToBeReserved;
104+
long releasedBytesInTotal = size - bytesToBeReserved;
105+
bytesToBeReserved = 0;
106+
reservedBytesInTotal -= releasedBytesInTotal;
107+
return new Pair<>(releasedBytesInReserved, releasedBytesInTotal);
108+
}
109+
}
110+
111+
@Override
112+
public void reserveMemoryVirtually(
113+
final long releasedBytesInReserved, final long releasedBytesInTotal) {
114+
bytesToBeReserved += releasedBytesInReserved;
115+
reservedBytesInTotal += releasedBytesInTotal;
116+
}
94117
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import org.apache.iotdb.db.queryengine.common.QueryId;
2323

24+
import org.apache.tsfile.utils.Pair;
25+
2426
import javax.annotation.concurrent.ThreadSafe;
2527

2628
@ThreadSafe
@@ -48,4 +50,15 @@ public synchronized void releaseMemoryCumulatively(long size) {
4850
public synchronized void releaseAllReservedMemory() {
4951
super.releaseAllReservedMemory();
5052
}
53+
54+
@Override
55+
public Pair<Long, Long> releaseMemoryVirtually(final long size) {
56+
return super.releaseMemoryVirtually(size);
57+
}
58+
59+
@Override
60+
public void reserveMemoryVirtually(
61+
final long releasedBytesInReserved, final long releasedBytesInTotal) {
62+
super.reserveMemoryVirtually(releasedBytesInReserved, releasedBytesInTotal);
63+
}
5164
}

0 commit comments

Comments
 (0)