Skip to content

Commit 8a79fa3

Browse files
authored
Load: Fix the Session exit issue after type conversion (#16037)
* Load: Fix the Session exit issue after type conversion * fix
1 parent fb3a69c commit 8a79fa3

File tree

2 files changed

+67
-24
lines changed

2 files changed

+67
-24
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,16 @@ public SessionInfo copySessionInfoForTreeModel(final SessionInfo sessionInfo) {
411411
IClientSession.SqlDialect.TREE);
412412
}
413413

414+
public SessionInfo getSessionInfoOfTreeModel(IClientSession session) {
415+
return new SessionInfo(
416+
session.getId(),
417+
session.getUsername(),
418+
ZoneId.systemDefault(),
419+
session.getClientVersion(),
420+
session.getDatabaseName(),
421+
IClientSession.SqlDialect.TREE);
422+
}
423+
414424
public SessionInfo getSessionInfoOfTableModel(IClientSession session) {
415425
return new SessionInfo(
416426
session.getId(),

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTsFileDataTypeConverter.java

Lines changed: 57 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -89,18 +89,24 @@ public Optional<TSStatus> convertForTableModel(final LoadTsFile loadTsFileTableS
8989
}
9090

9191
private TSStatus executeForTableModel(final Statement statement, final String databaseName) {
92-
final IClientSession session =
93-
new InternalClientSession(
94-
String.format(
95-
"%s_%s",
96-
LoadTsFileDataTypeConverter.class.getSimpleName(),
97-
Thread.currentThread().getName()));
98-
session.setUsername(AuthorityChecker.SUPER_USER);
99-
session.setClientVersion(IoTDBConstant.ClientVersion.V_1_0);
100-
session.setZoneId(ZoneId.systemDefault());
101-
session.setSqlDialect(IClientSession.SqlDialect.TABLE);
102-
103-
SESSION_MANAGER.registerSession(session);
92+
final IClientSession session;
93+
final boolean needToCreateSession = SESSION_MANAGER.getCurrSession() == null;
94+
if (needToCreateSession) {
95+
session =
96+
new InternalClientSession(
97+
String.format(
98+
"%s_%s",
99+
LoadTsFileDataTypeConverter.class.getSimpleName(),
100+
Thread.currentThread().getName()));
101+
session.setUsername(AuthorityChecker.SUPER_USER);
102+
session.setClientVersion(IoTDBConstant.ClientVersion.V_1_0);
103+
session.setZoneId(ZoneId.systemDefault());
104+
session.setSqlDialect(IClientSession.SqlDialect.TABLE);
105+
106+
SESSION_MANAGER.registerSession(session);
107+
} else {
108+
session = SESSION_MANAGER.getCurrSession();
109+
}
104110
try {
105111
return Coordinator.getInstance()
106112
.executeForTableModel(
@@ -114,7 +120,9 @@ private TSStatus executeForTableModel(final Statement statement, final String da
114120
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold())
115121
.status;
116122
} finally {
117-
SESSION_MANAGER.removeCurrSession();
123+
if (needToCreateSession) {
124+
SESSION_MANAGER.removeCurrSession();
125+
}
118126
}
119127
}
120128

@@ -134,17 +142,42 @@ public Optional<TSStatus> convertForTreeModel(final LoadTsFileStatement loadTsFi
134142
}
135143

136144
private TSStatus executeForTreeModel(final Statement statement) {
137-
return Coordinator.getInstance()
138-
.executeForTreeModel(
139-
isGeneratedByPipe ? new PipeEnrichedStatement(statement) : statement,
140-
SESSION_MANAGER.requestQueryId(),
141-
SESSION_MANAGER.getSessionInfo(SESSION_MANAGER.getCurrSession()),
142-
"",
143-
ClusterPartitionFetcher.getInstance(),
144-
ClusterSchemaFetcher.getInstance(),
145-
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
146-
false)
147-
.status;
145+
final IClientSession session;
146+
final boolean needToCreateSession = SESSION_MANAGER.getCurrSession() == null;
147+
if (needToCreateSession) {
148+
session =
149+
new InternalClientSession(
150+
String.format(
151+
"%s_%s",
152+
LoadTsFileDataTypeConverter.class.getSimpleName(),
153+
Thread.currentThread().getName()));
154+
session.setUsername(AuthorityChecker.SUPER_USER);
155+
session.setClientVersion(IoTDBConstant.ClientVersion.V_1_0);
156+
session.setZoneId(ZoneId.systemDefault());
157+
session.setSqlDialect(IClientSession.SqlDialect.TREE);
158+
159+
SESSION_MANAGER.registerSession(session);
160+
} else {
161+
session = SESSION_MANAGER.getCurrSession();
162+
}
163+
164+
try {
165+
return Coordinator.getInstance()
166+
.executeForTreeModel(
167+
isGeneratedByPipe ? new PipeEnrichedStatement(statement) : statement,
168+
SESSION_MANAGER.requestQueryId(),
169+
SESSION_MANAGER.getSessionInfoOfTreeModel(session),
170+
"",
171+
ClusterPartitionFetcher.getInstance(),
172+
ClusterSchemaFetcher.getInstance(),
173+
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
174+
false)
175+
.status;
176+
} finally {
177+
if (needToCreateSession) {
178+
SESSION_MANAGER.removeCurrSession();
179+
}
180+
}
148181
}
149182

150183
public boolean isSuccessful(final TSStatus status) {

0 commit comments

Comments
 (0)