Skip to content

Commit d151dce

Browse files
ruanwenjunpan3793
authored andcommitted
[KYUUBI #7290] Prevent engine session leak after Kyuubi session closed
### Why are the changes needed? close #7290 When we close a kyuubi session, if the engine session(thrift client) has not initialized, then the kyuubi session will be closed, but the engine session might be alive, then will result in a resource leak. ### How was this patch tested? Can be tested by when the engine pod is pending, and then kill the jdbc client, the driver pod should be killed after initialized. ### Was this patch authored or co-authored using generative AI tooling? NO Closes #7294 from ruanwenjun/dev_wenjun_fix7290. Closes #7290 142a210 [ruanwenjun] make sure the client closed 9745206 [ruanwenjun] set shouldRetry to false d3c8608 [ruanwenjun] remove unused comment 866126d [ruanwenjun] If the session already closed, then return 3b2ee39 [ruanwenjun] [KYUUBI #7290] Fix the engine session might still alive when kyuubi session has been closed Authored-by: ruanwenjun <[email protected]> Signed-off-by: Cheng Pan <[email protected]> (cherry picked from commit 8bed9de) Signed-off-by: Cheng Pan <[email protected]>
1 parent d8a4059 commit d151dce

File tree

3 files changed

+17
-1
lines changed

3 files changed

+17
-1
lines changed

kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ abstract class AbstractSession(
5252
@volatile private var _lastIdleTime: Long = _createTime
5353
override def lastIdleTime: Long = _lastIdleTime
5454

55+
@volatile private var _closed: Boolean = false
56+
5557
override def getNoOperationTime: Long = {
5658
if (lastIdleTime > 0) System.currentTimeMillis() - _lastIdleTime else 0
5759
}
@@ -87,6 +89,11 @@ abstract class AbstractSession(
8789
}
8890

8991
override def close(): Unit = withAcquireRelease() {
92+
if (_closed) {
93+
return
94+
}
95+
_closed = true
96+
info(s"Mark session $handle closed")
9097
opHandleSet.forEach { opHandle =>
9198
try {
9299
sessionManager.operationManager.closeOperation(opHandle)
@@ -95,8 +102,11 @@ abstract class AbstractSession(
95102
warn(s"Error closing operation $opHandle during closing $handle for", e)
96103
}
97104
}
105+
info(s"Closed all operations for session $handle")
98106
}
99107

108+
override def isClosed: Boolean = _closed
109+
100110
protected def runOperation(operation: Operation): OperationHandle = {
101111
try {
102112
val opHandle = operation.getHandle

kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ trait Session {
4444
def open(): Unit
4545
def close(): Unit
4646

47+
def isClosed: Boolean
48+
4749
def getInfo(infoType: TGetInfoType): TGetInfoValue
4850

4951
def executeStatement(

kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,10 @@ class KyuubiSessionImpl(
180180
_engineSessionHandle =
181181
engineClient.openSession(protocol, user, passwd, openEngineSessionConf)
182182
_client = engineClient
183+
if (isClosed) {
184+
shouldRetry = false
185+
throw KyuubiSQLException(s"KyuubiSession $handle has been closed")
186+
}
183187
logSessionInfo(s"Connected to engine [$host:$port]/[${client.engineId.getOrElse("")}]" +
184188
s" with ${_engineSessionHandle}]")
185189
shouldRetry = false
@@ -210,7 +214,7 @@ class KyuubiSessionImpl(
210214
throw e
211215
} finally {
212216
attempt += 1
213-
if (shouldRetry && engineClient != null) {
217+
if ((isClosed || shouldRetry) && engineClient != null) {
214218
try {
215219
engineClient.closeSession()
216220
} catch {

0 commit comments

Comments
 (0)