Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.read.filter.factory.FilterFactory;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -902,19 +903,32 @@ private void releaseTVListOwnedByQuery() {
queryContextSet.remove(this);
if (tvList.getOwnerQuery() == this) {
if (queryContextSet.isEmpty()) {
LOGGER.debug(
"TVList {} is released by the query, FragmentInstance Id is {}",
tvList,
this.getId());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"TVList {} is released by the query, FragmentInstance Id is {}",
tvList,
this.getId());
}
memoryReservationManager.releaseMemoryCumulatively(tvList.calculateRamSize());
tvList.clear();
} else {
// Transfer memory to next query. It must be exception-safe as this method is called
// during FragmentInstanceExecution cleanup. Any exception during this process could
// prevent proper resource cleanup and cause memory leaks.
Pair<Long, Long> releasedBytes =
memoryReservationManager.releaseMemoryVirtually(tvList.calculateRamSize());
FragmentInstanceContext queryContext =
(FragmentInstanceContext) queryContextSet.iterator().next();
LOGGER.debug(
"TVList {} is now owned by another query, FragmentInstance Id is {}",
tvList,
queryContext.getId());
queryContext
.getMemoryReservationContext()
.reserveMemoryVirtually(releasedBytes.left, releasedBytes.right);

if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"TVList {} is now owned by another query, FragmentInstance Id is {}",
tvList,
queryContext.getId());
}
tvList.setOwnerQuery(queryContext);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ public void reserveFromFreeMemoryForOperators(
final long reservedBytes,
final String queryId,
final String contextHolder) {
if (memoryInBytes <= 0) {
throw new IllegalArgumentException(
"Bytes to reserve from free memory for operators should be larger than 0");
}
if (OPERATORS_MEMORY_BLOCK.allocate(memoryInBytes)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
Expand All @@ -317,6 +321,10 @@ public void reserveFromFreeMemoryForOperators(
}

public void releaseToFreeMemoryForOperators(final long memoryInBytes) {
if (memoryInBytes <= 0) {
throw new IllegalArgumentException(
"Bytes to release to free memory for operators should be larger than 0");
}
OPERATORS_MEMORY_BLOCK.release(memoryInBytes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.iotdb.db.queryengine.plan.planner.memory;

import org.apache.tsfile.utils.Pair;

public class FakedMemoryReservationManager implements MemoryReservationManager {

@Override
Expand All @@ -32,4 +34,13 @@ public void releaseMemoryCumulatively(long size) {}

@Override
public void releaseAllReservedMemory() {}

@Override
public Pair<Long, Long> releaseMemoryVirtually(final long size) {
return new Pair<>(0L, 0L);
}

@Override
public void reserveMemoryVirtually(
final long bytesToBeReserved, final long bytesAlreadyReserved) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.iotdb.db.queryengine.plan.planner.memory;

import org.apache.tsfile.utils.Pair;

public interface MemoryReservationManager {
/**
* Reserve memory for the given size. The memory reservation request will be accumulated and the
Expand All @@ -43,4 +45,31 @@ public interface MemoryReservationManager {
* this manager ends, Or the memory to be released in the batch may not be released correctly.
*/
void releaseAllReservedMemory();

/**
* Release memory virtually without actually freeing the memory. This is used for memory
* reservation transfer scenarios where memory ownership needs to be transferred between different
* FragmentInstances without actual memory deallocation.
*
* <p>NOTE: When calling this method, it should be guaranteed that bytesToBeReserved +
* reservedBytesInTotal >= size to ensure proper memory accounting and prevent negative
* reservation values.
*
* @param size the size of memory to release virtually
* @return a Pair where the left element is the amount of memory released from the pending
* reservation queue (bytesToBeReserved), and the right element is the amount of memory that
* has already been reserved
*/
Pair<Long, Long> releaseMemoryVirtually(final long size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add some java doc about the return value

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


/**
* Reserve memory virtually without actually allocating new memory. This is used to transfer
* memory ownership from one FragmentInstances to another by reserving the memory that was
* previously released virtually. It updates the internal reservation state without changing the
* actual memory allocation.
*
* @param bytesToBeReserved the amount of memory that needs to be reserved cumulatively.
* @param bytesAlreadyReserved the amount of memory that has already been reserved
*/
void reserveMemoryVirtually(final long bytesToBeReserved, final long bytesAlreadyReserved);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;

import org.apache.tsfile.utils.Pair;

import javax.annotation.concurrent.NotThreadSafe;

@NotThreadSafe
Expand Down Expand Up @@ -91,4 +93,25 @@ public void releaseAllReservedMemory() {
bytesToBeReleased = 0;
}
}

@Override
public Pair<Long, Long> releaseMemoryVirtually(final long size) {
if (bytesToBeReserved >= size) {
bytesToBeReserved -= size;
return new Pair<>(size, 0L);
} else {
long releasedBytesInReserved = bytesToBeReserved;
long releasedBytesInTotal = size - bytesToBeReserved;
bytesToBeReserved = 0;
reservedBytesInTotal -= releasedBytesInTotal;
return new Pair<>(releasedBytesInReserved, releasedBytesInTotal);
}
}

@Override
public void reserveMemoryVirtually(
final long bytesToBeReserved, final long bytesAlreadyReserved) {
reservedBytesInTotal += bytesAlreadyReserved;
reserveMemoryCumulatively(bytesToBeReserved);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.apache.iotdb.db.queryengine.common.QueryId;

import org.apache.tsfile.utils.Pair;

import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
Expand Down Expand Up @@ -48,4 +50,15 @@ public synchronized void releaseMemoryCumulatively(long size) {
public synchronized void releaseAllReservedMemory() {
super.releaseAllReservedMemory();
}

@Override
public Pair<Long, Long> releaseMemoryVirtually(final long size) {
return super.releaseMemoryVirtually(size);
}

@Override
public void reserveMemoryVirtually(
final long bytesToBeReserved, final long bytesAlreadyReserved) {
super.reserveMemoryVirtually(bytesToBeReserved, bytesAlreadyReserved);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.queryengine.execution.fragment;

import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
import org.apache.iotdb.db.queryengine.exception.CpuNotEnoughException;
Expand All @@ -29,17 +30,26 @@
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink;
import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
import org.apache.iotdb.db.utils.datastructure.TVList;

import com.google.common.collect.ImmutableMap;
import org.apache.tsfile.enums.TSDataType;
import org.junit.Test;
import org.mockito.Mockito;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

import static org.apache.iotdb.db.queryengine.common.QueryId.MOCK_QUERY_ID;
import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand All @@ -50,32 +60,11 @@ public void testFragmentInstanceExecution() {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class);
FragmentInstanceId instanceId =
new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 0), "0");
FragmentInstanceStateMachine stateMachine =
new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
DataRegion dataRegion = Mockito.mock(DataRegion.class);
FragmentInstanceContext fragmentInstanceContext =
createFragmentInstanceContext(instanceId, stateMachine);
fragmentInstanceContext.initializeNumOfDrivers(1);
fragmentInstanceContext.setMayHaveTmpFile(true);
fragmentInstanceContext.setDataRegion(dataRegion);
List<IDriver> drivers = Collections.emptyList();
ISink sinkHandle = Mockito.mock(ISink.class);
long timeOut = -1;
MPPDataExchangeManager exchangeManager = Mockito.mock(MPPDataExchangeManager.class);
FragmentInstanceExecution execution =
FragmentInstanceExecution.createFragmentInstanceExecution(
scheduler,
instanceId,
fragmentInstanceContext,
drivers,
sinkHandle,
stateMachine,
timeOut,
false,
exchangeManager);
createFragmentInstanceExecution(0, instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext = execution.getFragmentInstanceContext();
FragmentInstanceStateMachine stateMachine = execution.getStateMachine();

assertEquals(FragmentInstanceState.RUNNING, execution.getInstanceState());
FragmentInstanceInfo instanceInfo = execution.getInstanceInfo();
assertEquals(FragmentInstanceState.RUNNING, instanceInfo.getState());
Expand All @@ -84,7 +73,7 @@ public void testFragmentInstanceExecution() {
assertEquals(fragmentInstanceContext.getFailureInfoList(), instanceInfo.getFailureInfoList());

assertEquals(fragmentInstanceContext.getStartTime(), execution.getStartTime());
assertEquals(timeOut, execution.getTimeoutInMs());
assertEquals(-1, execution.getTimeoutInMs());
assertEquals(stateMachine, execution.getStateMachine());

fragmentInstanceContext.decrementNumOfUnClosedDriver();
Expand All @@ -107,4 +96,112 @@ public void testFragmentInstanceExecution() {
instanceNotificationExecutor.shutdown();
}
}

@Test
public void testTVListOwnerTransfer() throws InterruptedException {
// Capture System.err to check for warning messages
PrintStream systemOut = System.out;
ByteArrayOutputStream logPrint = new ByteArrayOutputStream();
System.setOut(new PrintStream(logPrint));

try {
IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1);

ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
// TVList
TVList tvList = buildTVList();

// FragmentInstance Context & Execution
FragmentInstanceExecution execution1 =
createFragmentInstanceExecution(1, instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext1 = execution1.getFragmentInstanceContext();
fragmentInstanceContext1.addTVListToSet(ImmutableMap.of(tvList, 0));
tvList.getQueryContextSet().add(fragmentInstanceContext1);

FragmentInstanceExecution execution2 =
createFragmentInstanceExecution(2, instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext2 = execution2.getFragmentInstanceContext();
fragmentInstanceContext2.addTVListToSet(ImmutableMap.of(tvList, 0));
tvList.getQueryContextSet().add(fragmentInstanceContext2);

// mock flush's behavior
fragmentInstanceContext1
.getMemoryReservationContext()
.reserveMemoryCumulatively(tvList.calculateRamSize());
tvList.setOwnerQuery(fragmentInstanceContext1);

fragmentInstanceContext1.decrementNumOfUnClosedDriver();
fragmentInstanceContext2.decrementNumOfUnClosedDriver();

fragmentInstanceContext1.getStateMachine().finished();
Thread.sleep(100);
fragmentInstanceContext2.getStateMachine().finished();

assertTrue(execution1.getInstanceState().isDone());
assertTrue(execution2.getInstanceState().isDone());
Thread.sleep(100);
} catch (CpuNotEnoughException | MemoryNotEnoughException | IllegalArgumentException e) {
fail(e.getMessage());
} finally {
instanceNotificationExecutor.shutdown();
}
} finally {
// Restore original System.out
System.setErr(systemOut);

// should not contain warn message: "The memory cost to be released is larger than the memory
// cost of memory block"
String capturedOutput = logPrint.toString();
assertFalse(
"Should not contain warning message",
capturedOutput.contains("The memory cost to be released is larger than the memory"));
}
}

private FragmentInstanceExecution createFragmentInstanceExecution(int id, Executor executor)
throws CpuNotEnoughException {
IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class);
FragmentInstanceId instanceId =
new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, id), String.valueOf(id));
FragmentInstanceStateMachine stateMachine =
new FragmentInstanceStateMachine(instanceId, executor);
DataRegion dataRegion = Mockito.mock(DataRegion.class);
FragmentInstanceContext fragmentInstanceContext =
createFragmentInstanceContext(instanceId, stateMachine);
fragmentInstanceContext.initializeNumOfDrivers(1);
fragmentInstanceContext.setMayHaveTmpFile(true);
fragmentInstanceContext.setDataRegion(dataRegion);
List<IDriver> drivers = Collections.emptyList();
ISink sinkHandle = Mockito.mock(ISink.class);
long timeOut = -1;
MPPDataExchangeManager exchangeManager = Mockito.mock(MPPDataExchangeManager.class);
return FragmentInstanceExecution.createFragmentInstanceExecution(
scheduler,
instanceId,
fragmentInstanceContext,
drivers,
sinkHandle,
stateMachine,
timeOut,
false,
exchangeManager);
}

private TVList buildTVList() {
int columns = 200;
int rows = 1000;
List<TSDataType> dataTypes = new ArrayList<>();
Object[] values = new Object[columns];
for (int i = 0; i < columns; i++) {
dataTypes.add(TSDataType.INT64);
values[i] = 1L;
}
AlignedTVList tvList = AlignedTVList.newAlignedList(dataTypes);
for (long t = 1; t < rows; t++) {
tvList.putAlignedValue(t, values);
}
return tvList;
}
}