Skip to content

Commit e020df2

Browse files
authored
[To dev/1.3] fix: memory to be released is larger than the memory of memory block in TVList owner transfer case #16943 Open
1 parent d2622f6 commit e020df2

File tree

7 files changed

+230
-35
lines changed

7 files changed

+230
-35
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.apache.tsfile.file.metadata.IDeviceID;
5858
import org.apache.tsfile.read.common.TimeRange;
5959
import org.apache.tsfile.read.filter.basic.Filter;
60+
import org.apache.tsfile.utils.Pair;
6061
import org.apache.tsfile.utils.RamUsageEstimator;
6162
import org.slf4j.Logger;
6263
import org.slf4j.LoggerFactory;
@@ -873,20 +874,33 @@ private void releaseTVListOwnedByQuery() {
873874
try {
874875
queryContextSet.remove(this);
875876
if (tvList.getOwnerQuery() == this) {
876-
if (queryContextSet.isEmpty()) {
877-
LOGGER.debug(
878-
"TVList {} is released by the query, FragmentInstance Id is {}",
879-
tvList,
880-
this.getId());
877+
if (LOGGER.isDebugEnabled()) {
878+
if (queryContextSet.isEmpty()) {
879+
LOGGER.debug(
880+
"TVList {} is released by the query, FragmentInstance Id is {}",
881+
tvList,
882+
this.getId());
883+
}
881884
memoryReservationManager.releaseMemoryCumulatively(tvList.calculateRamSize());
882885
tvList.clear();
883886
} else {
887+
// Transfer memory to next query. It must be exception-safe as this method is called
888+
// during FragmentInstanceExecution cleanup. Any exception during this process could
889+
// prevent proper resource cleanup and cause memory leaks.
890+
Pair<Long, Long> releasedBytes =
891+
memoryReservationManager.releaseMemoryVirtually(tvList.calculateRamSize());
884892
FragmentInstanceContext queryContext =
885893
(FragmentInstanceContext) queryContextSet.iterator().next();
886-
LOGGER.debug(
887-
"TVList {} is now owned by another query, FragmentInstance Id is {}",
888-
tvList,
889-
queryContext.getId());
894+
queryContext
895+
.getMemoryReservationContext()
896+
.reserveMemoryVirtually(releasedBytes.left, releasedBytes.right);
897+
898+
if (LOGGER.isDebugEnabled()) {
899+
LOGGER.debug(
900+
"TVList {} is now owned by another query, FragmentInstance Id is {}",
901+
tvList,
902+
queryContext.getId());
903+
}
890904
tvList.setOwnerQuery(queryContext);
891905
}
892906
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,10 @@ public synchronized void reserveFromFreeMemoryForOperators(
244244
final long reservedBytes,
245245
final String queryId,
246246
final String contextHolder) {
247+
if (memoryInBytes <= 0) {
248+
throw new IllegalArgumentException(
249+
"Bytes to reserve from free memory for operators should be larger than 0");
250+
}
247251
if (memoryInBytes > freeMemoryForOperators) {
248252
throw new MemoryNotEnoughException(
249253
String.format(
@@ -264,6 +268,10 @@ public synchronized void reserveFromFreeMemoryForOperators(
264268
}
265269

266270
public synchronized void releaseToFreeMemoryForOperators(final long memoryInBytes) {
271+
if (memoryInBytes <= 0) {
272+
throw new IllegalArgumentException(
273+
"Bytes to release to free memory for operators should be larger than 0");
274+
}
267275
freeMemoryForOperators += memoryInBytes;
268276

269277
if (freeMemoryForOperators > ALLOCATE_MEMORY_FOR_OPERATORS) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.apache.iotdb.db.queryengine.plan.planner.memory;
2121

22+
import org.apache.tsfile.utils.Pair;
23+
2224
public class FakedMemoryReservationManager implements MemoryReservationManager {
2325

2426
@Override
@@ -32,4 +34,13 @@ public void releaseMemoryCumulatively(long size) {}
3234

3335
@Override
3436
public void releaseAllReservedMemory() {}
37+
38+
@Override
39+
public Pair<Long, Long> releaseMemoryVirtually(final long size) {
40+
return new Pair<>(0L, 0L);
41+
}
42+
43+
@Override
44+
public void reserveMemoryVirtually(
45+
final long bytesToBeReserved, final long bytesAlreadyReserved) {}
3546
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.apache.iotdb.db.queryengine.plan.planner.memory;
2121

22+
import org.apache.tsfile.utils.Pair;
23+
2224
public interface MemoryReservationManager {
2325
/**
2426
* Reserve memory for the given size. The memory reservation request will be accumulated and the
@@ -43,4 +45,31 @@ public interface MemoryReservationManager {
4345
* this manager ends, Or the memory to be released in the batch may not be released correctly.
4446
*/
4547
void releaseAllReservedMemory();
48+
49+
/**
50+
* Release memory virtually without actually freeing the memory. This is used for memory
51+
* reservation transfer scenarios where memory ownership needs to be transferred between different
52+
* FragmentInstances without actual memory deallocation.
53+
*
54+
* <p>NOTE: When calling this method, it should be guaranteed that bytesToBeReserved +
55+
* reservedBytesInTotal >= size to ensure proper memory accounting and prevent negative
56+
* reservation values.
57+
*
58+
* @param size the size of memory to release virtually
59+
* @return a Pair where the left element is the amount of memory released from the pending
60+
* reservation queue (bytesToBeReserved), and the right element is the amount of memory that
61+
* has already been reserved
62+
*/
63+
Pair<Long, Long> releaseMemoryVirtually(final long size);
64+
65+
/**
66+
* Reserve memory virtually without actually allocating new memory. This is used to transfer
67+
* memory ownership from one FragmentInstances to another by reserving the memory that was
68+
* previously released virtually. It updates the internal reservation state without changing the
69+
* actual memory allocation.
70+
*
71+
* @param bytesToBeReserved the amount of memory that needs to be reserved cumulatively.
72+
* @param bytesAlreadyReserved the amount of memory that has already been reserved
73+
*/
74+
void reserveMemoryVirtually(final long bytesToBeReserved, final long bytesAlreadyReserved);
4675
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.iotdb.db.queryengine.common.QueryId;
2323
import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
2424

25+
import org.apache.tsfile.utils.Pair;
26+
2527
import javax.annotation.concurrent.NotThreadSafe;
2628

2729
@NotThreadSafe
@@ -91,4 +93,25 @@ public void releaseAllReservedMemory() {
9193
bytesToBeReleased = 0;
9294
}
9395
}
96+
97+
@Override
98+
public Pair<Long, Long> releaseMemoryVirtually(final long size) {
99+
if (bytesToBeReserved >= size) {
100+
bytesToBeReserved -= size;
101+
return new Pair<>(size, 0L);
102+
} else {
103+
long releasedBytesInReserved = bytesToBeReserved;
104+
long releasedBytesInTotal = size - bytesToBeReserved;
105+
bytesToBeReserved = 0;
106+
reservedBytesInTotal -= releasedBytesInTotal;
107+
return new Pair<>(releasedBytesInReserved, releasedBytesInTotal);
108+
}
109+
}
110+
111+
@Override
112+
public void reserveMemoryVirtually(
113+
final long bytesToBeReserved, final long bytesAlreadyReserved) {
114+
reservedBytesInTotal += bytesAlreadyReserved;
115+
reserveMemoryCumulatively(bytesToBeReserved);
116+
}
94117
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import org.apache.iotdb.db.queryengine.common.QueryId;
2323

24+
import org.apache.tsfile.utils.Pair;
25+
2426
import javax.annotation.concurrent.ThreadSafe;
2527

2628
@ThreadSafe
@@ -48,4 +50,15 @@ public synchronized void releaseMemoryCumulatively(long size) {
4850
public synchronized void releaseAllReservedMemory() {
4951
super.releaseAllReservedMemory();
5052
}
53+
54+
@Override
55+
public Pair<Long, Long> releaseMemoryVirtually(final long size) {
56+
return super.releaseMemoryVirtually(size);
57+
}
58+
59+
@Override
60+
public void reserveMemoryVirtually(
61+
final long bytesToBeReserved, final long bytesAlreadyReserved) {
62+
super.reserveMemoryVirtually(bytesToBeReserved, bytesAlreadyReserved);
63+
}
5164
}

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java

Lines changed: 123 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.db.queryengine.execution.fragment;
2121

2222
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
23+
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2324
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
2425
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
2526
import org.apache.iotdb.db.queryengine.exception.CpuNotEnoughException;
@@ -29,17 +30,26 @@
2930
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink;
3031
import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler;
3132
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
33+
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
34+
import org.apache.iotdb.db.utils.datastructure.TVList;
3235

36+
import com.google.common.collect.ImmutableMap;
37+
import org.apache.tsfile.enums.TSDataType;
3338
import org.junit.Test;
3439
import org.mockito.Mockito;
3540

41+
import java.io.ByteArrayOutputStream;
42+
import java.io.PrintStream;
43+
import java.util.ArrayList;
3644
import java.util.Collections;
3745
import java.util.List;
46+
import java.util.concurrent.Executor;
3847
import java.util.concurrent.ExecutorService;
3948

4049
import static org.apache.iotdb.db.queryengine.common.QueryId.MOCK_QUERY_ID;
4150
import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
4251
import static org.junit.Assert.assertEquals;
52+
import static org.junit.Assert.assertFalse;
4353
import static org.junit.Assert.assertTrue;
4454
import static org.junit.Assert.fail;
4555

@@ -50,32 +60,11 @@ public void testFragmentInstanceExecution() {
5060
ExecutorService instanceNotificationExecutor =
5161
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
5262
try {
53-
IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class);
54-
FragmentInstanceId instanceId =
55-
new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 0), "0");
56-
FragmentInstanceStateMachine stateMachine =
57-
new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
58-
DataRegion dataRegion = Mockito.mock(DataRegion.class);
59-
FragmentInstanceContext fragmentInstanceContext =
60-
createFragmentInstanceContext(instanceId, stateMachine);
61-
fragmentInstanceContext.initializeNumOfDrivers(1);
62-
fragmentInstanceContext.setMayHaveTmpFile(true);
63-
fragmentInstanceContext.setDataRegion(dataRegion);
64-
List<IDriver> drivers = Collections.emptyList();
65-
ISink sinkHandle = Mockito.mock(ISink.class);
66-
long timeOut = -1;
67-
MPPDataExchangeManager exchangeManager = Mockito.mock(MPPDataExchangeManager.class);
6863
FragmentInstanceExecution execution =
69-
FragmentInstanceExecution.createFragmentInstanceExecution(
70-
scheduler,
71-
instanceId,
72-
fragmentInstanceContext,
73-
drivers,
74-
sinkHandle,
75-
stateMachine,
76-
timeOut,
77-
false,
78-
exchangeManager);
64+
createFragmentInstanceExecution(0, instanceNotificationExecutor);
65+
FragmentInstanceContext fragmentInstanceContext = execution.getFragmentInstanceContext();
66+
FragmentInstanceStateMachine stateMachine = execution.getStateMachine();
67+
7968
assertEquals(FragmentInstanceState.RUNNING, execution.getInstanceState());
8069
FragmentInstanceInfo instanceInfo = execution.getInstanceInfo();
8170
assertEquals(FragmentInstanceState.RUNNING, instanceInfo.getState());
@@ -84,7 +73,7 @@ public void testFragmentInstanceExecution() {
8473
assertEquals(fragmentInstanceContext.getFailureInfoList(), instanceInfo.getFailureInfoList());
8574

8675
assertEquals(fragmentInstanceContext.getStartTime(), execution.getStartTime());
87-
assertEquals(timeOut, execution.getTimeoutInMs());
76+
assertEquals(-1, execution.getTimeoutInMs());
8877
assertEquals(stateMachine, execution.getStateMachine());
8978

9079
fragmentInstanceContext.decrementNumOfUnClosedDriver();
@@ -107,4 +96,112 @@ public void testFragmentInstanceExecution() {
10796
instanceNotificationExecutor.shutdown();
10897
}
10998
}
99+
100+
@Test
101+
public void testTVListOwnerTransfer() throws InterruptedException {
102+
// Capture System.err to check for warning messages
103+
PrintStream systemOut = System.out;
104+
ByteArrayOutputStream logPrint = new ByteArrayOutputStream();
105+
System.setOut(new PrintStream(logPrint));
106+
107+
try {
108+
IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1);
109+
110+
ExecutorService instanceNotificationExecutor =
111+
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
112+
try {
113+
// TVList
114+
TVList tvList = buildTVList();
115+
116+
// FragmentInstance Context & Execution
117+
FragmentInstanceExecution execution1 =
118+
createFragmentInstanceExecution(1, instanceNotificationExecutor);
119+
FragmentInstanceContext fragmentInstanceContext1 = execution1.getFragmentInstanceContext();
120+
fragmentInstanceContext1.addTVListToSet(ImmutableMap.of(tvList, 0));
121+
tvList.getQueryContextSet().add(fragmentInstanceContext1);
122+
123+
FragmentInstanceExecution execution2 =
124+
createFragmentInstanceExecution(2, instanceNotificationExecutor);
125+
FragmentInstanceContext fragmentInstanceContext2 = execution2.getFragmentInstanceContext();
126+
fragmentInstanceContext2.addTVListToSet(ImmutableMap.of(tvList, 0));
127+
tvList.getQueryContextSet().add(fragmentInstanceContext2);
128+
129+
// mock flush's behavior
130+
fragmentInstanceContext1
131+
.getMemoryReservationContext()
132+
.reserveMemoryCumulatively(tvList.calculateRamSize());
133+
tvList.setOwnerQuery(fragmentInstanceContext1);
134+
135+
fragmentInstanceContext1.decrementNumOfUnClosedDriver();
136+
fragmentInstanceContext2.decrementNumOfUnClosedDriver();
137+
138+
fragmentInstanceContext1.getStateMachine().finished();
139+
Thread.sleep(100);
140+
fragmentInstanceContext2.getStateMachine().finished();
141+
142+
assertTrue(execution1.getInstanceState().isDone());
143+
assertTrue(execution2.getInstanceState().isDone());
144+
Thread.sleep(100);
145+
} catch (CpuNotEnoughException | MemoryNotEnoughException | IllegalArgumentException e) {
146+
fail(e.getMessage());
147+
} finally {
148+
instanceNotificationExecutor.shutdown();
149+
}
150+
} finally {
151+
// Restore original System.out
152+
System.setErr(systemOut);
153+
154+
// should not contain warn message: "The memory cost to be released is larger than the memory
155+
// cost of memory block"
156+
String capturedOutput = logPrint.toString();
157+
assertFalse(
158+
"Should not contain error message",
159+
capturedOutput.contains("is more than allocated memory"));
160+
}
161+
}
162+
163+
private FragmentInstanceExecution createFragmentInstanceExecution(int id, Executor executor)
164+
throws CpuNotEnoughException {
165+
IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class);
166+
FragmentInstanceId instanceId =
167+
new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, id), String.valueOf(id));
168+
FragmentInstanceStateMachine stateMachine =
169+
new FragmentInstanceStateMachine(instanceId, executor);
170+
DataRegion dataRegion = Mockito.mock(DataRegion.class);
171+
FragmentInstanceContext fragmentInstanceContext =
172+
createFragmentInstanceContext(instanceId, stateMachine);
173+
fragmentInstanceContext.initializeNumOfDrivers(1);
174+
fragmentInstanceContext.setMayHaveTmpFile(true);
175+
fragmentInstanceContext.setDataRegion(dataRegion);
176+
List<IDriver> drivers = Collections.emptyList();
177+
ISink sinkHandle = Mockito.mock(ISink.class);
178+
long timeOut = -1;
179+
MPPDataExchangeManager exchangeManager = Mockito.mock(MPPDataExchangeManager.class);
180+
return FragmentInstanceExecution.createFragmentInstanceExecution(
181+
scheduler,
182+
instanceId,
183+
fragmentInstanceContext,
184+
drivers,
185+
sinkHandle,
186+
stateMachine,
187+
timeOut,
188+
false,
189+
exchangeManager);
190+
}
191+
192+
private TVList buildTVList() {
193+
int columns = 200;
194+
int rows = 1000;
195+
List<TSDataType> dataTypes = new ArrayList<>();
196+
Object[] values = new Object[columns];
197+
for (int i = 0; i < columns; i++) {
198+
dataTypes.add(TSDataType.INT64);
199+
values[i] = 1L;
200+
}
201+
AlignedTVList tvList = AlignedTVList.newAlignedList(dataTypes);
202+
for (long t = 1; t < rows; t++) {
203+
tvList.putAlignedValue(t, values);
204+
}
205+
return tvList;
206+
}
110207
}

0 commit comments

Comments
 (0)