Skip to content

Commit 8d495bb

Browse files
committed
refactor memory management
1 parent 0df796b commit 8d495bb

File tree

6 files changed

+133
-147
lines changed

6 files changed

+133
-147
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,6 @@ public void setDatabaseName(@Nullable String databaseName) {
194194
* @param statementName the name of the prepared statement
195195
* @return the removed prepared statement info, or null if not found
196196
*/
197-
@Nullable
198197
public abstract PreparedStatementInfo removePreparedStatement(String statementName);
199198

200199
/**
@@ -203,7 +202,6 @@ public void setDatabaseName(@Nullable String databaseName) {
203202
* @param statementName the name of the prepared statement
204203
* @return the prepared statement info, or null if not found
205204
*/
206-
@Nullable
207205
public abstract PreparedStatementInfo getPreparedStatement(String statementName);
208206

209207
/**

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/PreparedStatementInfo.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.apache.iotdb.db.protocol.session;
2121

22-
import org.apache.iotdb.commons.memory.IMemoryBlock;
2322
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
2423

2524
import java.util.Objects;
@@ -28,28 +27,28 @@
2827

2928
/**
3029
* Information about a prepared statement stored in a session. The AST is cached here to avoid
31-
* re-parsing on EXECUTE.
30+
* reparsing on EXECUTE.
3231
*/
3332
public class PreparedStatementInfo {
3433

3534
private final String statementName;
3635
private final Statement sql; // Cached AST (contains Parameter nodes)
3736
private final long createTime;
38-
private final IMemoryBlock memoryBlock; // Memory block allocated for this PreparedStatement
37+
private final long memorySizeInBytes; // Memory size allocated for this PreparedStatement
3938

40-
public PreparedStatementInfo(String statementName, Statement sql, IMemoryBlock memoryBlock) {
39+
public PreparedStatementInfo(String statementName, Statement sql, long memorySizeInBytes) {
4140
this.statementName = requireNonNull(statementName, "statementName is null");
4241
this.sql = requireNonNull(sql, "sql is null");
4342
this.createTime = System.currentTimeMillis();
44-
this.memoryBlock = memoryBlock;
43+
this.memorySizeInBytes = memorySizeInBytes;
4544
}
4645

4746
public PreparedStatementInfo(
48-
String statementName, Statement sql, long createTime, IMemoryBlock memoryBlock) {
47+
String statementName, Statement sql, long createTime, long memorySizeInBytes) {
4948
this.statementName = requireNonNull(statementName, "statementName is null");
5049
this.sql = requireNonNull(sql, "sql is null");
5150
this.createTime = createTime;
52-
this.memoryBlock = memoryBlock;
51+
this.memorySizeInBytes = memorySizeInBytes;
5352
}
5453

5554
public String getStatementName() {
@@ -64,8 +63,8 @@ public long getCreateTime() {
6463
return createTime;
6564
}
6665

67-
public IMemoryBlock getMemoryBlock() {
68-
return memoryBlock;
66+
public long getMemorySizeInBytes() {
67+
return memorySizeInBytes;
6968
}
7069

7170
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.apache.iotdb.commons.conf.CommonConfig;
3030
import org.apache.iotdb.commons.conf.CommonDescriptor;
3131
import org.apache.iotdb.commons.conf.IoTDBConstant;
32+
import org.apache.iotdb.commons.memory.IMemoryBlock;
33+
import org.apache.iotdb.commons.memory.MemoryBlockType;
3234
import org.apache.iotdb.db.auth.AuthorityChecker;
3335
import org.apache.iotdb.db.conf.IoTDBConfig;
3436
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -195,13 +197,26 @@ public class Coordinator {
195197

196198
private static final Coordinator INSTANCE = new Coordinator();
197199

200+
private static final IMemoryBlock coordinatorMemoryBlock;
201+
198202
private final ConcurrentHashMap<Long, IQueryExecution> queryExecutionMap;
199203

200204
private final StatementRewrite statementRewrite;
201205
private final List<PlanOptimizer> logicalPlanOptimizers;
202206
private final List<PlanOptimizer> distributionPlanOptimizers;
203207
private final DataNodeLocationSupplierFactory.DataNodeLocationSupplier dataNodeLocationSupplier;
204208

209+
static {
210+
coordinatorMemoryBlock =
211+
IoTDBDescriptor.getInstance()
212+
.getMemoryConfig()
213+
.getCoordinatorMemoryManager()
214+
.exactAllocate("Coordinator", MemoryBlockType.DYNAMIC);
215+
LOGGER.debug(
216+
"Initialized shared MemoryBlock 'Coordinator' with all available memory: {} bytes",
217+
coordinatorMemoryBlock.getTotalMemorySizeInBytes());
218+
}
219+
205220
private Coordinator() {
206221
this.queryExecutionMap = new ConcurrentHashMap<>();
207222
this.executor = getQueryExecutor();
@@ -665,6 +680,10 @@ public static Coordinator getInstance() {
665680
return INSTANCE;
666681
}
667682

683+
public static IMemoryBlock getCoordinatorMemoryBlock() {
684+
return coordinatorMemoryBlock;
685+
}
686+
668687
public void recordExecutionTime(long queryId, long executionTime) {
669688
IQueryExecution queryExecution = getQueryExecution(queryId);
670689
if (queryExecution != null) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/session/DeallocateTask.java

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -47,28 +47,26 @@ public DeallocateTask(String statementName) {
4747
public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor)
4848
throws InterruptedException {
4949
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
50-
try {
51-
IClientSession session = SessionManager.getInstance().getCurrSession();
52-
if (session == null) {
53-
future.setException(
54-
new IllegalStateException("No current session available for DEALLOCATE statement"));
55-
return future;
56-
}
57-
58-
// Remove the prepared statement
59-
PreparedStatementInfo removedInfo = session.removePreparedStatement(statementName);
60-
if (removedInfo == null) {
61-
throw new SemanticException(
62-
String.format("Prepared statement '%s' does not exist", statementName));
63-
} else {
64-
// Release the memory allocated for this PreparedStatement
65-
PreparedStatementMemoryManager.getInstance().release(removedInfo.getMemoryBlock());
66-
}
50+
IClientSession session = SessionManager.getInstance().getCurrSession();
51+
if (session == null) {
52+
future.setException(
53+
new IllegalStateException("No current session available for DEALLOCATE statement"));
54+
return future;
55+
}
6756

68-
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
69-
} catch (Exception e) {
70-
future.setException(e);
57+
// Remove the prepared statement
58+
PreparedStatementInfo removedInfo = session.removePreparedStatement(statementName);
59+
if (removedInfo == null) {
60+
future.setException(
61+
new SemanticException(
62+
String.format("Prepared statement '%s' does not exist", statementName)));
63+
return future;
7164
}
65+
66+
// Release the memory allocated for this PreparedStatement from the shared MemoryBlock
67+
PreparedStatementMemoryManager.getInstance().release(removedInfo.getMemorySizeInBytes());
68+
69+
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
7270
return future;
7371
}
7472
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/session/PrepareTask.java

Lines changed: 24 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.apache.iotdb.db.queryengine.plan.execution.config.session;
2121

22-
import org.apache.iotdb.commons.memory.IMemoryBlock;
2322
import org.apache.iotdb.db.exception.sql.SemanticException;
2423
import org.apache.iotdb.db.protocol.session.IClientSession;
2524
import org.apache.iotdb.db.protocol.session.PreparedStatementInfo;
@@ -36,7 +35,7 @@
3635

3736
/**
3837
* Task for executing PREPARE statement. Stores the prepared statement AST in the session. The AST
39-
* is cached to avoid re-parsing on EXECUTE (skipping Parser phase). Memory is allocated from
38+
* is cached to avoid reparsing on EXECUTE (skipping Parser phase). Memory is allocated from
4039
* CoordinatorMemoryManager and shared across all sessions.
4140
*/
4241
public class PrepareTask implements IConfigTask {
@@ -53,42 +52,34 @@ public PrepareTask(String statementName, Statement sql) {
5352
public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor)
5453
throws InterruptedException {
5554
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
56-
IMemoryBlock memoryBlock = null;
57-
try {
58-
IClientSession session = SessionManager.getInstance().getCurrSession();
59-
if (session == null) {
60-
future.setException(
61-
new IllegalStateException("No current session available for PREPARE statement"));
62-
return future;
63-
}
55+
IClientSession session = SessionManager.getInstance().getCurrSession();
56+
if (session == null) {
57+
future.setException(
58+
new IllegalStateException("No current session available for PREPARE statement"));
59+
return future;
60+
}
6461

65-
// Check if prepared statement with the same name already exists
66-
PreparedStatementInfo existingInfo = session.getPreparedStatement(statementName);
67-
if (existingInfo != null) {
68-
throw new SemanticException(
69-
String.format("Prepared statement '%s' already exists.", statementName));
70-
}
62+
// Check if prepared statement with the same name already exists
63+
PreparedStatementInfo existingInfo = session.getPreparedStatement(statementName);
64+
if (existingInfo != null) {
65+
future.setException(
66+
new SemanticException(
67+
String.format("Prepared statement '%s' already exists.", statementName)));
68+
return future;
69+
}
7170

72-
// Estimate memory size of the AST
73-
long memorySizeInBytes = AstMemoryEstimator.estimateMemorySize(sql);
71+
// Estimate memory size of the AST
72+
long memorySizeInBytes = AstMemoryEstimator.estimateMemorySize(sql);
7473

75-
// Allocate memory from CoordinatorMemoryManager
76-
// This memory is shared across all sessions
77-
memoryBlock =
78-
PreparedStatementMemoryManager.getInstance().allocate(statementName, memorySizeInBytes);
74+
// Allocate memory from CoordinatorMemoryManager
75+
// This memory is shared across all sessions using a single MemoryBlock
76+
PreparedStatementMemoryManager.getInstance().allocate(statementName, memorySizeInBytes);
7977

80-
// Create and store the prepared statement info (AST is cached)
81-
PreparedStatementInfo info = new PreparedStatementInfo(statementName, sql, memoryBlock);
82-
session.addPreparedStatement(statementName, info);
78+
// Create and store the prepared statement info (AST is cached)
79+
PreparedStatementInfo info = new PreparedStatementInfo(statementName, sql, memorySizeInBytes);
80+
session.addPreparedStatement(statementName, info);
8381

84-
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
85-
} catch (Exception e) {
86-
// If memory allocation succeeded but something else failed, release the memory
87-
if (memoryBlock != null) {
88-
PreparedStatementMemoryManager.getInstance().release(memoryBlock);
89-
}
90-
future.setException(e);
91-
}
82+
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
9283
return future;
9384
}
9485
}

0 commit comments

Comments
 (0)