Skip to content

Commit 7722963

Browse files
authored
fix: memory leak during tvlist owner transfer scenario (#16932)
1 parent 302864a commit 7722963

File tree

7 files changed

+229
-34
lines changed

7 files changed

+229
-34
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.apache.tsfile.read.common.TimeRange;
6161
import org.apache.tsfile.read.filter.basic.Filter;
6262
import org.apache.tsfile.read.filter.factory.FilterFactory;
63+
import org.apache.tsfile.utils.Pair;
6364
import org.apache.tsfile.utils.RamUsageEstimator;
6465
import org.slf4j.Logger;
6566
import org.slf4j.LoggerFactory;
@@ -902,19 +903,32 @@ private void releaseTVListOwnedByQuery() {
902903
queryContextSet.remove(this);
903904
if (tvList.getOwnerQuery() == this) {
904905
if (queryContextSet.isEmpty()) {
905-
LOGGER.debug(
906-
"TVList {} is released by the query, FragmentInstance Id is {}",
907-
tvList,
908-
this.getId());
906+
if (LOGGER.isDebugEnabled()) {
907+
LOGGER.debug(
908+
"TVList {} is released by the query, FragmentInstance Id is {}",
909+
tvList,
910+
this.getId());
911+
}
909912
memoryReservationManager.releaseMemoryCumulatively(tvList.calculateRamSize());
910913
tvList.clear();
911914
} else {
915+
// Transfer memory to next query. It must be exception-safe as this method is called
916+
// during FragmentInstanceExecution cleanup. Any exception during this process could
917+
// prevent proper resource cleanup and cause memory leaks.
918+
Pair<Long, Long> releasedBytes =
919+
memoryReservationManager.releaseMemoryVirtually(tvList.calculateRamSize());
912920
FragmentInstanceContext queryContext =
913921
(FragmentInstanceContext) queryContextSet.iterator().next();
914-
LOGGER.debug(
915-
"TVList {} is now owned by another query, FragmentInstance Id is {}",
916-
tvList,
917-
queryContext.getId());
922+
queryContext
923+
.getMemoryReservationContext()
924+
.reserveMemoryVirtually(releasedBytes.left, releasedBytes.right);
925+
926+
if (LOGGER.isDebugEnabled()) {
927+
LOGGER.debug(
928+
"TVList {} is now owned by another query, FragmentInstance Id is {}",
929+
tvList,
930+
queryContext.getId());
931+
}
918932
tvList.setOwnerQuery(queryContext);
919933
}
920934
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,10 @@ public void reserveFromFreeMemoryForOperators(
294294
final long reservedBytes,
295295
final String queryId,
296296
final String contextHolder) {
297+
if (memoryInBytes <= 0) {
298+
throw new IllegalArgumentException(
299+
"Bytes to reserve from free memory for operators should be larger than 0");
300+
}
297301
if (OPERATORS_MEMORY_BLOCK.allocate(memoryInBytes)) {
298302
if (LOGGER.isDebugEnabled()) {
299303
LOGGER.debug(
@@ -317,6 +321,10 @@ public void reserveFromFreeMemoryForOperators(
317321
}
318322

319323
public void releaseToFreeMemoryForOperators(final long memoryInBytes) {
324+
if (memoryInBytes <= 0) {
325+
throw new IllegalArgumentException(
326+
"Bytes to release to free memory for operators should be larger than 0");
327+
}
320328
OPERATORS_MEMORY_BLOCK.release(memoryInBytes);
321329
}
322330

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.apache.iotdb.db.queryengine.plan.planner.memory;
2121

22+
import org.apache.tsfile.utils.Pair;
23+
2224
public class FakedMemoryReservationManager implements MemoryReservationManager {
2325

2426
@Override
@@ -32,4 +34,13 @@ public void releaseMemoryCumulatively(long size) {}
3234

3335
@Override
3436
public void releaseAllReservedMemory() {}
37+
38+
@Override
39+
public Pair<Long, Long> releaseMemoryVirtually(final long size) {
40+
return new Pair<>(0L, 0L);
41+
}
42+
43+
@Override
44+
public void reserveMemoryVirtually(
45+
final long bytesToBeReserved, final long bytesAlreadyReserved) {}
3546
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.apache.iotdb.db.queryengine.plan.planner.memory;
2121

22+
import org.apache.tsfile.utils.Pair;
23+
2224
public interface MemoryReservationManager {
2325
/**
2426
* Reserve memory for the given size. The memory reservation request will be accumulated and the
@@ -43,4 +45,31 @@ public interface MemoryReservationManager {
4345
* this manager ends, Or the memory to be released in the batch may not be released correctly.
4446
*/
4547
void releaseAllReservedMemory();
48+
49+
/**
50+
* Release memory virtually without actually freeing the memory. This is used for memory
51+
* reservation transfer scenarios where memory ownership needs to be transferred between different
52+
* FragmentInstances without actual memory deallocation.
53+
*
54+
* <p>NOTE: When calling this method, it should be guaranteed that bytesToBeReserved +
55+
* reservedBytesInTotal >= size to ensure proper memory accounting and prevent negative
56+
* reservation values.
57+
*
58+
* @param size the size of memory to release virtually
59+
* @return a Pair where the left element is the amount of memory released from the pending
60+
* reservation queue (bytesToBeReserved), and the right element is the amount of memory that
61+
* has already been reserved
62+
*/
63+
Pair<Long, Long> releaseMemoryVirtually(final long size);
64+
65+
/**
66+
* Reserve memory virtually without actually allocating new memory. This is used to transfer
67+
* memory ownership from one FragmentInstances to another by reserving the memory that was
68+
* previously released virtually. It updates the internal reservation state without changing the
69+
* actual memory allocation.
70+
*
71+
* @param bytesToBeReserved the amount of memory that needs to be reserved cumulatively.
72+
* @param bytesAlreadyReserved the amount of memory that has already been reserved
73+
*/
74+
void reserveMemoryVirtually(final long bytesToBeReserved, final long bytesAlreadyReserved);
4675
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.iotdb.db.queryengine.common.QueryId;
2323
import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
2424

25+
import org.apache.tsfile.utils.Pair;
26+
2527
import javax.annotation.concurrent.NotThreadSafe;
2628

2729
@NotThreadSafe
@@ -91,4 +93,25 @@ public void releaseAllReservedMemory() {
9193
bytesToBeReleased = 0;
9294
}
9395
}
96+
97+
@Override
98+
public Pair<Long, Long> releaseMemoryVirtually(final long size) {
99+
if (bytesToBeReserved >= size) {
100+
bytesToBeReserved -= size;
101+
return new Pair<>(size, 0L);
102+
} else {
103+
long releasedBytesInReserved = bytesToBeReserved;
104+
long releasedBytesInTotal = size - bytesToBeReserved;
105+
bytesToBeReserved = 0;
106+
reservedBytesInTotal -= releasedBytesInTotal;
107+
return new Pair<>(releasedBytesInReserved, releasedBytesInTotal);
108+
}
109+
}
110+
111+
@Override
112+
public void reserveMemoryVirtually(
113+
final long bytesToBeReserved, final long bytesAlreadyReserved) {
114+
reservedBytesInTotal += bytesAlreadyReserved;
115+
reserveMemoryCumulatively(bytesToBeReserved);
116+
}
94117
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import org.apache.iotdb.db.queryengine.common.QueryId;
2323

24+
import org.apache.tsfile.utils.Pair;
25+
2426
import javax.annotation.concurrent.ThreadSafe;
2527

2628
@ThreadSafe
@@ -48,4 +50,15 @@ public synchronized void releaseMemoryCumulatively(long size) {
4850
public synchronized void releaseAllReservedMemory() {
4951
super.releaseAllReservedMemory();
5052
}
53+
54+
@Override
55+
public Pair<Long, Long> releaseMemoryVirtually(final long size) {
56+
return super.releaseMemoryVirtually(size);
57+
}
58+
59+
@Override
60+
public void reserveMemoryVirtually(
61+
final long bytesToBeReserved, final long bytesAlreadyReserved) {
62+
super.reserveMemoryVirtually(bytesToBeReserved, bytesAlreadyReserved);
63+
}
5164
}

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java

Lines changed: 123 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.db.queryengine.execution.fragment;
2121

2222
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
23+
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2324
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
2425
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
2526
import org.apache.iotdb.db.queryengine.exception.CpuNotEnoughException;
@@ -29,17 +30,26 @@
2930
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink;
3031
import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler;
3132
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
33+
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
34+
import org.apache.iotdb.db.utils.datastructure.TVList;
3235

36+
import com.google.common.collect.ImmutableMap;
37+
import org.apache.tsfile.enums.TSDataType;
3338
import org.junit.Test;
3439
import org.mockito.Mockito;
3540

41+
import java.io.ByteArrayOutputStream;
42+
import java.io.PrintStream;
43+
import java.util.ArrayList;
3644
import java.util.Collections;
3745
import java.util.List;
46+
import java.util.concurrent.Executor;
3847
import java.util.concurrent.ExecutorService;
3948

4049
import static org.apache.iotdb.db.queryengine.common.QueryId.MOCK_QUERY_ID;
4150
import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
4251
import static org.junit.Assert.assertEquals;
52+
import static org.junit.Assert.assertFalse;
4353
import static org.junit.Assert.assertTrue;
4454
import static org.junit.Assert.fail;
4555

@@ -50,32 +60,11 @@ public void testFragmentInstanceExecution() {
5060
ExecutorService instanceNotificationExecutor =
5161
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
5262
try {
53-
IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class);
54-
FragmentInstanceId instanceId =
55-
new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 0), "0");
56-
FragmentInstanceStateMachine stateMachine =
57-
new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
58-
DataRegion dataRegion = Mockito.mock(DataRegion.class);
59-
FragmentInstanceContext fragmentInstanceContext =
60-
createFragmentInstanceContext(instanceId, stateMachine);
61-
fragmentInstanceContext.initializeNumOfDrivers(1);
62-
fragmentInstanceContext.setMayHaveTmpFile(true);
63-
fragmentInstanceContext.setDataRegion(dataRegion);
64-
List<IDriver> drivers = Collections.emptyList();
65-
ISink sinkHandle = Mockito.mock(ISink.class);
66-
long timeOut = -1;
67-
MPPDataExchangeManager exchangeManager = Mockito.mock(MPPDataExchangeManager.class);
6863
FragmentInstanceExecution execution =
69-
FragmentInstanceExecution.createFragmentInstanceExecution(
70-
scheduler,
71-
instanceId,
72-
fragmentInstanceContext,
73-
drivers,
74-
sinkHandle,
75-
stateMachine,
76-
timeOut,
77-
false,
78-
exchangeManager);
64+
createFragmentInstanceExecution(0, instanceNotificationExecutor);
65+
FragmentInstanceContext fragmentInstanceContext = execution.getFragmentInstanceContext();
66+
FragmentInstanceStateMachine stateMachine = execution.getStateMachine();
67+
7968
assertEquals(FragmentInstanceState.RUNNING, execution.getInstanceState());
8069
FragmentInstanceInfo instanceInfo = execution.getInstanceInfo();
8170
assertEquals(FragmentInstanceState.RUNNING, instanceInfo.getState());
@@ -84,7 +73,7 @@ public void testFragmentInstanceExecution() {
8473
assertEquals(fragmentInstanceContext.getFailureInfoList(), instanceInfo.getFailureInfoList());
8574

8675
assertEquals(fragmentInstanceContext.getStartTime(), execution.getStartTime());
87-
assertEquals(timeOut, execution.getTimeoutInMs());
76+
assertEquals(-1, execution.getTimeoutInMs());
8877
assertEquals(stateMachine, execution.getStateMachine());
8978

9079
fragmentInstanceContext.decrementNumOfUnClosedDriver();
@@ -107,4 +96,112 @@ public void testFragmentInstanceExecution() {
10796
instanceNotificationExecutor.shutdown();
10897
}
10998
}
99+
100+
@Test
101+
public void testTVListOwnerTransfer() throws InterruptedException {
102+
// Capture System.err to check for warning messages
103+
PrintStream systemOut = System.out;
104+
ByteArrayOutputStream logPrint = new ByteArrayOutputStream();
105+
System.setOut(new PrintStream(logPrint));
106+
107+
try {
108+
IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1);
109+
110+
ExecutorService instanceNotificationExecutor =
111+
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
112+
try {
113+
// TVList
114+
TVList tvList = buildTVList();
115+
116+
// FragmentInstance Context & Execution
117+
FragmentInstanceExecution execution1 =
118+
createFragmentInstanceExecution(1, instanceNotificationExecutor);
119+
FragmentInstanceContext fragmentInstanceContext1 = execution1.getFragmentInstanceContext();
120+
fragmentInstanceContext1.addTVListToSet(ImmutableMap.of(tvList, 0));
121+
tvList.getQueryContextSet().add(fragmentInstanceContext1);
122+
123+
FragmentInstanceExecution execution2 =
124+
createFragmentInstanceExecution(2, instanceNotificationExecutor);
125+
FragmentInstanceContext fragmentInstanceContext2 = execution2.getFragmentInstanceContext();
126+
fragmentInstanceContext2.addTVListToSet(ImmutableMap.of(tvList, 0));
127+
tvList.getQueryContextSet().add(fragmentInstanceContext2);
128+
129+
// mock flush's behavior
130+
fragmentInstanceContext1
131+
.getMemoryReservationContext()
132+
.reserveMemoryCumulatively(tvList.calculateRamSize());
133+
tvList.setOwnerQuery(fragmentInstanceContext1);
134+
135+
fragmentInstanceContext1.decrementNumOfUnClosedDriver();
136+
fragmentInstanceContext2.decrementNumOfUnClosedDriver();
137+
138+
fragmentInstanceContext1.getStateMachine().finished();
139+
Thread.sleep(100);
140+
fragmentInstanceContext2.getStateMachine().finished();
141+
142+
assertTrue(execution1.getInstanceState().isDone());
143+
assertTrue(execution2.getInstanceState().isDone());
144+
Thread.sleep(100);
145+
} catch (CpuNotEnoughException | MemoryNotEnoughException | IllegalArgumentException e) {
146+
fail(e.getMessage());
147+
} finally {
148+
instanceNotificationExecutor.shutdown();
149+
}
150+
} finally {
151+
// Restore original System.out
152+
System.setErr(systemOut);
153+
154+
// should not contain warn message: "The memory cost to be released is larger than the memory
155+
// cost of memory block"
156+
String capturedOutput = logPrint.toString();
157+
assertFalse(
158+
"Should not contain warning message",
159+
capturedOutput.contains("The memory cost to be released is larger than the memory"));
160+
}
161+
}
162+
163+
private FragmentInstanceExecution createFragmentInstanceExecution(int id, Executor executor)
164+
throws CpuNotEnoughException {
165+
IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class);
166+
FragmentInstanceId instanceId =
167+
new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, id), String.valueOf(id));
168+
FragmentInstanceStateMachine stateMachine =
169+
new FragmentInstanceStateMachine(instanceId, executor);
170+
DataRegion dataRegion = Mockito.mock(DataRegion.class);
171+
FragmentInstanceContext fragmentInstanceContext =
172+
createFragmentInstanceContext(instanceId, stateMachine);
173+
fragmentInstanceContext.initializeNumOfDrivers(1);
174+
fragmentInstanceContext.setMayHaveTmpFile(true);
175+
fragmentInstanceContext.setDataRegion(dataRegion);
176+
List<IDriver> drivers = Collections.emptyList();
177+
ISink sinkHandle = Mockito.mock(ISink.class);
178+
long timeOut = -1;
179+
MPPDataExchangeManager exchangeManager = Mockito.mock(MPPDataExchangeManager.class);
180+
return FragmentInstanceExecution.createFragmentInstanceExecution(
181+
scheduler,
182+
instanceId,
183+
fragmentInstanceContext,
184+
drivers,
185+
sinkHandle,
186+
stateMachine,
187+
timeOut,
188+
false,
189+
exchangeManager);
190+
}
191+
192+
private TVList buildTVList() {
193+
int columns = 200;
194+
int rows = 1000;
195+
List<TSDataType> dataTypes = new ArrayList<>();
196+
Object[] values = new Object[columns];
197+
for (int i = 0; i < columns; i++) {
198+
dataTypes.add(TSDataType.INT64);
199+
values[i] = 1L;
200+
}
201+
AlignedTVList tvList = AlignedTVList.newAlignedList(dataTypes);
202+
for (long t = 1; t < rows; t++) {
203+
tvList.putAlignedValue(t, values);
204+
}
205+
return tvList;
206+
}
110207
}

0 commit comments

Comments
 (0)