Skip to content

Commit db36f74

Browse files
committed
[SPARK-54815][CONNECT] Do not close the class loader of the session state if session is still in use
### What changes were proposed in this pull request? This is a followup of #53233 . When session state is evicted from `Executor#isolatedSessionCache`, the session may still being used by the running tasks. This PR adds ref counting and skips closing class loader if the session is still in use. ### Why are the changes needed? closing class loader may break running tasks ### Does this PR introduce _any_ user-facing change? Yes, previously long running tasks may fail, and now it's fixed. ### How was this patch tested? existing tests. It's hard to construct a long running task and test the class loader behavior, but this fix is quite obvious. ### Was this patch authored or co-authored using generative AI tooling? cursor 2.2.20 Closes #53569 from cloud-fan/cache. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent d65ee81 commit db36f74

File tree

1 file changed

+66
-27
lines changed

1 file changed

+66
-27
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 66 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import java.net.{URI, URL, URLClassLoader}
2424
import java.nio.ByteBuffer
2525
import java.util.{Locale, Properties}
2626
import java.util.concurrent._
27-
import java.util.concurrent.atomic.AtomicBoolean
27+
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
2828
import java.util.concurrent.locks.ReentrantLock
2929
import javax.annotation.concurrent.GuardedBy
3030

@@ -59,13 +59,63 @@ import org.apache.spark.util._
5959
import org.apache.spark.util.ArrayImplicits._
6060

6161
private[spark] class IsolatedSessionState(
62-
val sessionUUID: String,
63-
var urlClassLoader: MutableURLClassLoader,
64-
var replClassLoader: ClassLoader,
65-
val currentFiles: HashMap[String, Long],
66-
val currentJars: HashMap[String, Long],
67-
val currentArchives: HashMap[String, Long],
68-
val replClassDirUri: Option[String])
62+
val sessionUUID: String,
63+
var urlClassLoader: MutableURLClassLoader,
64+
var replClassLoader: ClassLoader,
65+
val currentFiles: HashMap[String, Long],
66+
val currentJars: HashMap[String, Long],
67+
val currentArchives: HashMap[String, Long],
68+
val replClassDirUri: Option[String]) extends Logging {
69+
70+
// Reference count for the number of running tasks using this session.
71+
private val refCount: AtomicInteger = new AtomicInteger(0)
72+
73+
// Whether this session has been evicted from the cache.
74+
@volatile private var evicted: Boolean = false
75+
76+
/** Increment the reference count, indicating a task is using this session. */
77+
def acquire(): Unit = refCount.incrementAndGet()
78+
79+
/** Decrement the reference count. If evicted and no more tasks, clean up. */
80+
def release(): Unit = {
81+
if (refCount.decrementAndGet() == 0 && evicted) {
82+
cleanup()
83+
}
84+
}
85+
86+
/** Mark this session as evicted. If no tasks are using it, clean up immediately. */
87+
def markEvicted(): Unit = {
88+
evicted = true
89+
if (refCount.get() == 0) {
90+
cleanup()
91+
} else {
92+
logInfo(log"Session ${MDC(SESSION_ID, sessionUUID)} evicted but still in use by " +
93+
log"${MDC(LogKeys.COUNT, refCount.get())} task(s), deferring cleanup")
94+
}
95+
}
96+
97+
private def cleanup(): Unit = {
98+
// Close the urlClassLoader to release resources.
99+
try {
100+
urlClassLoader match {
101+
case cl: URLClassLoader =>
102+
cl.close()
103+
logInfo(log"Closed urlClassLoader for session ${MDC(SESSION_ID, sessionUUID)}")
104+
case _ =>
105+
}
106+
} catch {
107+
case NonFatal(e) =>
108+
logWarning(log"Failed to close urlClassLoader for session " +
109+
log"${MDC(SESSION_ID, sessionUUID)}", e)
110+
}
111+
// Delete session files.
112+
val sessionBasedRoot = new File(SparkFiles.getRootDirectory(), sessionUUID)
113+
if (sessionBasedRoot.isDirectory && sessionBasedRoot.exists()) {
114+
Utils.deleteRecursively(sessionBasedRoot)
115+
}
116+
logInfo(log"Session cleaned up: ${MDC(SESSION_ID, sessionUUID)}")
117+
}
118+
}
69119

70120
/**
71121
* Spark executor, backed by a threadpool to run tasks.
@@ -220,25 +270,9 @@ private[spark] class Executor(
220270
val state = notification.getValue
221271
// Cache is always used for isolated sessions.
222272
assert(!isDefaultState(state.sessionUUID))
223-
// Close the urlClassLoader to release resources.
224-
try {
225-
state.urlClassLoader match {
226-
case urlClassLoader: URLClassLoader =>
227-
urlClassLoader.close()
228-
logInfo(log"Closed urlClassLoader (URLClassLoader) for evicted session " +
229-
log"${MDC(SESSION_ID, state.sessionUUID)}")
230-
case _ =>
231-
}
232-
} catch {
233-
case NonFatal(e) =>
234-
logWarning(log"Failed to close urlClassLoader for session " +
235-
log"${MDC(SESSION_ID, state.sessionUUID)}", e)
236-
}
237-
val sessionBasedRoot = new File(SparkFiles.getRootDirectory(), state.sessionUUID)
238-
if (sessionBasedRoot.isDirectory && sessionBasedRoot.exists()) {
239-
Utils.deleteRecursively(sessionBasedRoot)
240-
}
241-
logInfo(log"Session evicted: ${MDC(SESSION_ID, state.sessionUUID)}")
273+
// Mark evicted - cleanup will happen immediately if no tasks are using it,
274+
// or when the last task releases it.
275+
state.markEvicted()
242276
}
243277
})
244278
.build[String, IsolatedSessionState]
@@ -600,6 +634,9 @@ private[spark] class Executor(
600634
case _ => defaultSessionState
601635
}
602636

637+
// Pin the session to prevent its class loader from being closed while this task is running.
638+
isolatedSession.acquire()
639+
603640
setMDCForTask(taskName, mdcProperties)
604641
threadId = Thread.currentThread.getId
605642
Thread.currentThread.setName(threadName)
@@ -905,6 +942,8 @@ private[spark] class Executor(
905942
// are known, and metricsPoller.onTaskStart was called.
906943
metricsPoller.onTaskCompletion(taskId, task.stageId, task.stageAttemptId)
907944
}
945+
// Release the session reference. If evicted and this was the last task, cleanup happens.
946+
isolatedSession.release()
908947
}
909948
}
910949

0 commit comments

Comments
 (0)