Skip to content

Commit bce61f5

Browse files
authored
Fix CometShuffleManager hang by deferring SparkEnv access (#3002)
1 parent dca45ea commit bce61f5

File tree

2 files changed

+31
-18
lines changed

2 files changed

+31
-18
lines changed

.github/workflows/spark_sql_test.yml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,6 @@ jobs:
5959
- {name: "sql_hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"}
6060
- {name: "sql_hive-2", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.ExtendedHiveTest"}
6161
- {name: "sql_hive-3", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.SlowHiveTest"}
62-
# Skip sql_hive-1 for Spark 4.0 due to https://github.com/apache/datafusion-comet/issues/2946
63-
exclude:
64-
- spark-version: {short: '4.0', full: '4.0.1', java: 17}
65-
module: {name: "sql_hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"}
6662
fail-fast: false
6763
name: spark-sql-${{ matrix.module.name }}/${{ matrix.os }}/spark-${{ matrix.spark-version.full }}/java-${{ matrix.spark-version.java }}
6864
runs-on: ${{ matrix.os }}

spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,37 @@ class CometShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {
5858
*/
5959
private[this] val taskIdMapsForShuffle = new ConcurrentHashMap[Int, OpenHashSet[Long]]()
6060

61-
private lazy val shuffleExecutorComponents = loadShuffleExecutorComponents(conf)
61+
// Lazy initialization to avoid accessing SparkEnv.get during ShuffleManager construction,
62+
// which can cause hangs when SparkEnv is not fully initialized (e.g., during Hive metastore ops)
63+
// This is only initialized when getWriter/getReader is called (during task execution),
64+
// at which point SparkEnv should be fully available
65+
@volatile private var _shuffleExecutorComponents: ShuffleExecutorComponents = _
66+
67+
private def shuffleExecutorComponents: ShuffleExecutorComponents = {
68+
if (_shuffleExecutorComponents == null) {
69+
synchronized {
70+
if (_shuffleExecutorComponents == null) {
71+
val executorComponents = ShuffleDataIOUtils.loadShuffleDataIO(conf).executor()
72+
val extraConfigs =
73+
conf.getAllWithPrefix(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX).toMap
74+
// SparkEnv.get should be available when getWriter/getReader is called
75+
// (during task execution), but check for null to avoid hangs
76+
val env = SparkEnv.get
77+
if (env == null) {
78+
throw new IllegalStateException(
79+
"SparkEnv.get is null during shuffleExecutorComponents initialization. " +
80+
"This may indicate a timing issue with SparkEnv initialization.")
81+
}
82+
executorComponents.initializeExecutor(
83+
conf.getAppId,
84+
env.executorId,
85+
extraConfigs.asJava)
86+
_shuffleExecutorComponents = executorComponents
87+
}
88+
}
89+
}
90+
_shuffleExecutorComponents
91+
}
6292

6393
override val shuffleBlockResolver: IndexShuffleBlockResolver = {
6494
// The patch versions of Spark 3.4 have different constructor signatures:
@@ -253,19 +283,6 @@ class CometShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {
253283

254284
object CometShuffleManager extends Logging {
255285

256-
/**
257-
* Loads executor components for shuffle data IO.
258-
*/
259-
private def loadShuffleExecutorComponents(conf: SparkConf): ShuffleExecutorComponents = {
260-
val executorComponents = ShuffleDataIOUtils.loadShuffleDataIO(conf).executor()
261-
val extraConfigs = conf.getAllWithPrefix(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX).toMap
262-
executorComponents.initializeExecutor(
263-
conf.getAppId,
264-
SparkEnv.get.executorId,
265-
extraConfigs.asJava)
266-
executorComponents
267-
}
268-
269286
def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
270287
// We cannot bypass sorting if we need to do map-side aggregation.
271288
if (dep.mapSideCombine) {

0 commit comments

Comments
 (0)