Skip to content

Commit 8b05829

Browse files
committed
fix bugs
1 parent 9490f9f commit 8b05829

File tree

5 files changed

+70
-56
lines changed

5 files changed

+70
-56
lines changed

integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBPreparedStatementIT.java

Lines changed: 42 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -96,38 +96,56 @@ private static void executePreparedStatementAndVerify(
9696
throws SQLException {
9797
// Execute with parameters using write connection directly
9898
// In cluster test, we need to use write connection to ensure same session
99-
Statement writeStatement;
10099
if (connection instanceof ClusterTestConnection) {
101100
// Use write connection directly for PreparedStatement queries
102-
writeStatement =
103-
((ClusterTestConnection) connection)
104-
.writeConnection
105-
.getUnderlyingConnection()
106-
.createStatement();
107-
} else {
108-
writeStatement = statement;
109-
}
101+
try (Statement writeStatement =
102+
((ClusterTestConnection) connection)
103+
.writeConnection
104+
.getUnderlyingConnection()
105+
.createStatement();
106+
ResultSet resultSet = writeStatement.executeQuery(executeSql)) {
107+
ResultSetMetaData metaData = resultSet.getMetaData();
110108

111-
try (ResultSet resultSet = writeStatement.executeQuery(executeSql)) {
112-
ResultSetMetaData metaData = resultSet.getMetaData();
109+
// Verify header
110+
assertEquals(expectedHeader.length, metaData.getColumnCount());
111+
for (int i = 1; i <= metaData.getColumnCount(); i++) {
112+
assertEquals(expectedHeader[i - 1], metaData.getColumnName(i));
113+
}
113114

114-
// Verify header
115-
assertEquals(expectedHeader.length, metaData.getColumnCount());
116-
for (int i = 1; i <= metaData.getColumnCount(); i++) {
117-
assertEquals(expectedHeader[i - 1], metaData.getColumnName(i));
115+
// Verify data
116+
int cnt = 0;
117+
while (resultSet.next()) {
118+
StringBuilder builder = new StringBuilder();
119+
for (int i = 1; i <= expectedHeader.length; i++) {
120+
builder.append(resultSet.getString(i)).append(",");
121+
}
122+
assertEquals(expectedRetArray[cnt], builder.toString());
123+
cnt++;
124+
}
125+
assertEquals(expectedRetArray.length, cnt);
118126
}
127+
} else {
128+
try (ResultSet resultSet = statement.executeQuery(executeSql)) {
129+
ResultSetMetaData metaData = resultSet.getMetaData();
130+
131+
// Verify header
132+
assertEquals(expectedHeader.length, metaData.getColumnCount());
133+
for (int i = 1; i <= metaData.getColumnCount(); i++) {
134+
assertEquals(expectedHeader[i - 1], metaData.getColumnName(i));
135+
}
119136

120-
// Verify data
121-
int cnt = 0;
122-
while (resultSet.next()) {
123-
StringBuilder builder = new StringBuilder();
124-
for (int i = 1; i <= expectedHeader.length; i++) {
125-
builder.append(resultSet.getString(i)).append(",");
137+
// Verify data
138+
int cnt = 0;
139+
while (resultSet.next()) {
140+
StringBuilder builder = new StringBuilder();
141+
for (int i = 1; i <= expectedHeader.length; i++) {
142+
builder.append(resultSet.getString(i)).append(",");
143+
}
144+
assertEquals(expectedRetArray[cnt], builder.toString());
145+
cnt++;
126146
}
127-
assertEquals(expectedRetArray[cnt], builder.toString());
128-
cnt++;
147+
assertEquals(expectedRetArray.length, cnt);
129148
}
130-
assertEquals(expectedRetArray.length, cnt);
131149
}
132150
}
133151

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/InternalClientSession.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,6 @@ public class InternalClientSession extends IClientSession {
3535

3636
private final Map<Long, Set<Long>> statementIdToQueryId = new ConcurrentHashMap<>();
3737

38-
// Map from statement name to PreparedStatementInfo
39-
private final Map<String, PreparedStatementInfo> preparedStatements = new ConcurrentHashMap<>();
40-
4138
public InternalClientSession(String clientID) {
4239
this.clientID = clientID;
4340
}
@@ -94,21 +91,25 @@ public void removeQueryId(Long statementId, Long queryId) {
9491

9592
@Override
9693
public void addPreparedStatement(String statementName, PreparedStatementInfo info) {
97-
preparedStatements.put(statementName, info);
94+
throw new UnsupportedOperationException(
95+
"InternalClientSession should never call PREPARE statement methods.");
9896
}
9997

10098
@Override
10199
public PreparedStatementInfo removePreparedStatement(String statementName) {
102-
return preparedStatements.remove(statementName);
100+
throw new UnsupportedOperationException(
101+
"InternalClientSession should never call PREPARE statement methods.");
103102
}
104103

105104
@Override
106105
public PreparedStatementInfo getPreparedStatement(String statementName) {
107-
return preparedStatements.get(statementName);
106+
throw new UnsupportedOperationException(
107+
"InternalClientSession should never call PREPARE statement methods.");
108108
}
109109

110110
@Override
111111
public Set<String> getPreparedStatementNames() {
112-
return preparedStatements.keySet();
112+
throw new UnsupportedOperationException(
113+
"InternalClientSession should never call PREPARE statement methods.");
113114
}
114115
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/MqttClientSession.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,12 @@
2222
import org.apache.iotdb.service.rpc.thrift.TSConnectionType;
2323

2424
import java.util.Collections;
25-
import java.util.Map;
2625
import java.util.Set;
27-
import java.util.concurrent.ConcurrentHashMap;
2826

2927
public class MqttClientSession extends IClientSession {
3028

3129
private final String clientID;
3230

33-
// Map from statement name to PreparedStatementInfo
34-
private final Map<String, PreparedStatementInfo> preparedStatements = new ConcurrentHashMap<>();
35-
3631
public MqttClientSession(String clientID) {
3732
this.clientID = clientID;
3833
}
@@ -84,21 +79,25 @@ public void removeQueryId(Long statementId, Long queryId) {
8479

8580
@Override
8681
public void addPreparedStatement(String statementName, PreparedStatementInfo info) {
87-
preparedStatements.put(statementName, info);
82+
throw new UnsupportedOperationException(
83+
"MQTT client session does not support PREPARE statement.");
8884
}
8985

9086
@Override
9187
public PreparedStatementInfo removePreparedStatement(String statementName) {
92-
return preparedStatements.remove(statementName);
88+
throw new UnsupportedOperationException(
89+
"MQTT client session does not support PREPARE statement.");
9390
}
9491

9592
@Override
9693
public PreparedStatementInfo getPreparedStatement(String statementName) {
97-
return preparedStatements.get(statementName);
94+
throw new UnsupportedOperationException(
95+
"MQTT client session does not support PREPARE statement.");
9896
}
9997

10098
@Override
10199
public Set<String> getPreparedStatementNames() {
102-
return preparedStatements.keySet();
100+
throw new UnsupportedOperationException(
101+
"MQTT client session does not support PREPARE statement.");
103102
}
104103
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,18 @@ public void closeStatement(
361361
}
362362
}
363363
session.removeStatementId(statementId);
364+
365+
// Release PreparedStatement resources when statement is closed
366+
try {
367+
PreparedStatementMemoryManager.getInstance().releaseAllForSession(session);
368+
} catch (Exception e) {
369+
LOGGER.warn(
370+
"Failed to release PreparedStatement resources when closing statement {} for session {}: {}",
371+
statementId,
372+
session,
373+
e.getMessage(),
374+
e);
375+
}
364376
}
365377

366378
public long requestQueryId(IClientSession session, Long statementId) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/handler/BaseServerContextHandler.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@
2020
package org.apache.iotdb.db.protocol.thrift.handler;
2121

2222
import org.apache.iotdb.db.protocol.session.ClientSession;
23-
import org.apache.iotdb.db.protocol.session.IClientSession;
2423
import org.apache.iotdb.db.protocol.session.SessionManager;
25-
import org.apache.iotdb.db.queryengine.plan.Coordinator;
2624
import org.apache.iotdb.external.api.thrift.JudgableServerContext;
2725
import org.apache.iotdb.external.api.thrift.ServerContextFactory;
2826
import org.apache.iotdb.rpc.TElasticFramedTransport;
@@ -72,20 +70,6 @@ public ServerContext createContext(TProtocol in, TProtocol out) {
7270
}
7371

7472
public void deleteContext(ServerContext context, TProtocol in, TProtocol out) {
75-
IClientSession session = getSessionManager().getCurrSession();
76-
77-
// Release session resources (including PreparedStatement memory)
78-
// This handles TCP connection loss scenarios
79-
if (session != null) {
80-
try {
81-
getSessionManager().closeSession(session, Coordinator.getInstance()::cleanupQueryExecution);
82-
} catch (Exception e) {
83-
logger.warn(
84-
"Failed to close session during TCP connection disconnect: {}", e.getMessage(), e);
85-
}
86-
}
87-
88-
// Remove the session from the current thread
8973
getSessionManager().removeCurrSession();
9074

9175
if (context != null && factory != null) {

0 commit comments

Comments
 (0)