Skip to content

Commit 6cec3c3

Browse files
ericm-dbHyukjinKwon
authored andcommitted
[SPARK-54675][SPARK-54655][SS] Add configurable force shutdown timeout for StateStore maintenance thread pool
### What changes were proposed in this pull request? This PR introduces a new configuration parameter spark.sql.streaming.stateStore.maintenanceForceShutdownTimeout to control the timeout for force shutdown operations in the StateStore maintenance thread pool. Previously, the force shutdown timeout was hardcoded to 60 seconds in the MaintenanceThreadPool.stop() method. This PR makes it configurable to allow better control over graceful vs forceful shutdown behavior in different deployment scenarios. ### Why are the changes needed? Different streaming workloads and operational environments may require different force shutdown timeout values: - Some environments may need longer timeouts to allow in-flight maintenance operations to complete properly - Other environments may prefer shorter timeouts to speed up shutdown sequences - Making this configurable provides operators with more control over the tradeoff between clean shutdown and shutdown speed ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Existing tests continue to pass - The default value maintains backward compatibility with the previous hardcoded behavior ### Was this patch authored or co-authored using generative AI tooling? No Closes #53432 from ericm-db/state-store-shutdown-config. Authored-by: ericm-db <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent 19a1da9 commit 6cec3c3

File tree

5 files changed

+38
-3
lines changed

5 files changed

+38
-3
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2553,6 +2553,15 @@ object SQLConf {
25532553
.timeConf(TimeUnit.SECONDS)
25542554
.createWithDefault(300L)
25552555

2556+
val STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT =
2557+
buildConf("spark.sql.streaming.stateStore.maintenanceForceShutdownTimeout")
2558+
.internal()
2559+
.doc("Timeout in seconds to wait for tasks to respond to cancellation after " +
2560+
"force shutdown is initiated. This applies after the graceful shutdown timeout " +
2561+
"has been exceeded.")
2562+
.timeConf(TimeUnit.SECONDS)
2563+
.createWithDefault(60L)
2564+
25562565
val STATE_STORE_MAINTENANCE_PROCESSING_TIMEOUT =
25572566
buildConf("spark.sql.streaming.stateStore.maintenanceProcessingTimeout")
25582567
.internal()
@@ -6921,6 +6930,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
69216930

69226931
def stateStoreMaintenanceShutdownTimeout: Long = getConf(STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT)
69236932

6933+
def stateStoreMaintenanceForceShutdownTimeout: Long =
6934+
getConf(STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT)
6935+
69246936
def stateStoreMaintenanceProcessingTimeout: Long =
69256937
getConf(STATE_STORE_MAINTENANCE_PROCESSING_TIMEOUT)
69266938

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1130,7 +1130,8 @@ object StateStore extends Logging {
11301130
*/
11311131
class MaintenanceThreadPool(
11321132
numThreads: Int,
1133-
shutdownTimeout: Long) {
1133+
shutdownTimeout: Long,
1134+
forceShutdownTimeout: Long) {
11341135
private val threadPool = ThreadUtils.newDaemonFixedThreadPool(
11351136
numThreads, "state-store-maintenance-thread")
11361137

@@ -1151,7 +1152,7 @@ object StateStore extends Logging {
11511152
threadPool.shutdownNow() // Cancel currently executing tasks
11521153

11531154
// Wait a while for tasks to respond to being cancelled
1154-
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
1155+
if (!threadPool.awaitTermination(forceShutdownTimeout, TimeUnit.SECONDS)) {
11551156
logError("MaintenanceThreadPool did not terminate")
11561157
}
11571158
}
@@ -1416,14 +1417,15 @@ object StateStore extends Logging {
14161417
private def startMaintenanceIfNeeded(storeConf: StateStoreConf): Unit = {
14171418
val numMaintenanceThreads = storeConf.numStateStoreMaintenanceThreads
14181419
val maintenanceShutdownTimeout = storeConf.stateStoreMaintenanceShutdownTimeout
1420+
val maintenanceForceShutdownTimeout = storeConf.stateStoreMaintenanceForceShutdownTimeout
14191421
loadedProviders.synchronized {
14201422
if (SparkEnv.get != null && !isMaintenanceRunning && !storeConf.unloadOnCommit) {
14211423
maintenanceTask = new MaintenanceTask(
14221424
storeConf.maintenanceInterval,
14231425
task = { doMaintenance(storeConf) }
14241426
)
14251427
maintenanceThreadPool = new MaintenanceThreadPool(numMaintenanceThreads,
1426-
maintenanceShutdownTimeout)
1428+
maintenanceShutdownTimeout, maintenanceForceShutdownTimeout)
14271429
logInfo("State Store maintenance task started")
14281430
}
14291431
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@ class StateStoreConf(
4040
*/
4141
val stateStoreMaintenanceShutdownTimeout: Long = sqlConf.stateStoreMaintenanceShutdownTimeout
4242

43+
/**
44+
* Timeout to wait for tasks to respond to cancellation after force shutdown is initiated
45+
*/
46+
val stateStoreMaintenanceForceShutdownTimeout: Long =
47+
sqlConf.stateStoreMaintenanceForceShutdownTimeout
48+
4349
val stateStoreMaintenanceProcessingTimeout: Long = sqlConf.stateStoreMaintenanceProcessingTimeout
4450

4551
/**

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
127127
SQLConf.STATE_STORE_FORCE_SNAPSHOT_UPLOAD_ON_LAG.key -> "false",
128128
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
129129
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
130+
SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
130131
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
131132
SQLConf.STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_VERSION_DIFF_TO_LOG.key -> "2",
132133
SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL.key -> "0"
@@ -433,6 +434,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
433434
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
434435
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
435436
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
437+
SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
436438
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
437439
SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName,
438440
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> "true",
@@ -468,6 +470,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
468470
SQLConf.SHUFFLE_PARTITIONS.key -> "3",
469471
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
470472
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
473+
SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
471474
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
472475
SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName,
473476
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> "true",
@@ -511,6 +514,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
511514
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
512515
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
513516
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
517+
SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
514518
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
515519
SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
516520
classOf[RocksDBSkipMaintenanceOnCertainPartitionsProvider].getName,
@@ -578,6 +582,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
578582
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
579583
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
580584
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
585+
SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
581586
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
582587
SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName,
583588
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> "false",
@@ -621,6 +626,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
621626
SQLConf.SHUFFLE_PARTITIONS.key -> "2",
622627
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
623628
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
629+
SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
624630
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
625631
SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
626632
classOf[RocksDBSkipMaintenanceOnCertainPartitionsProvider].getName,
@@ -666,6 +672,7 @@ class StateStoreCoordinatorStreamingSuite extends StreamTest {
666672
SQLConf.SHUFFLE_PARTITIONS.key -> "3",
667673
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
668674
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
675+
SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
669676
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "2",
670677
SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName,
671678
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> "true",
@@ -787,6 +794,7 @@ class StateStoreCoordinatorStreamingSuite extends StreamTest {
787794
SQLConf.SHUFFLE_PARTITIONS.key -> "3",
788795
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
789796
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
797+
SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
790798
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
791799
SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
792800
classOf[RocksDBSkipMaintenanceOnCertainPartitionsProvider].getName,
@@ -866,6 +874,7 @@ class StateStoreCoordinatorStreamingSuite extends StreamTest {
866874
SQLConf.SHUFFLE_PARTITIONS.key -> "2",
867875
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
868876
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
877+
SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
869878
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
870879
SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName,
871880
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> "true",
@@ -941,6 +950,7 @@ object StateStoreCoordinatorSuite {
941950
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
942951
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
943952
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
953+
SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
944954
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
945955
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> "true",
946956
SQLConf.STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG.key -> "true",

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreInstanceMetricSuite.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with AlsoTestWithRocksDBF
7575
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
7676
SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10",
7777
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
78+
SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
7879
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
7980
SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "3"
8081
) {
@@ -139,6 +140,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with AlsoTestWithRocksDBF
139140
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
140141
SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10",
141142
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
143+
SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
142144
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
143145
SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "3"
144146
) {
@@ -214,6 +216,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with AlsoTestWithRocksDBF
214216
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
215217
SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10",
216218
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
219+
SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
217220
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
218221
SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "10",
219222
SQLConf.SHUFFLE_PARTITIONS.key -> "3"
@@ -291,6 +294,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with AlsoTestWithRocksDBF
291294
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
292295
SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10",
293296
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
297+
SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
294298
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
295299
SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "10"
296300
) {
@@ -365,6 +369,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with AlsoTestWithRocksDBF
365369
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
366370
SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10",
367371
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
372+
SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
368373
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
369374
SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "4",
370375
SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION.key -> "3"

0 commit comments

Comments
 (0)