Skip to content

Commit 71a6656

Browse files
committed
remove conf
1 parent e68ce5a commit 71a6656

File tree

5 files changed

+7
-56
lines changed

5 files changed

+7
-56
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2584,16 +2584,6 @@ object SQLConf {
25842584
.createWithDefault(
25852585
"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider")
25862586

2587-
val STREAMING_CHECKPOINT_STATE_CREATE_METADATA_DIR_ON_READ =
2588-
buildConf("spark.sql.streaming.checkpoint.stateCreateMetadataDirOnRead")
2589-
.internal()
2590-
.doc(
2591-
"When true, the state data source reader will create metadata directories if they " +
2592-
"don't exist. When false, the reader will only read from existing directories.")
2593-
.version("4.2.0")
2594-
.booleanConf
2595-
.createWithDefault(false)
2596-
25972587
val NUM_STATE_STORE_MAINTENANCE_THREADS =
25982588
buildConf("spark.sql.streaming.stateStore.numStateStoreMaintenanceThreads")
25992589
.internal()
@@ -7098,9 +7088,6 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
70987088

70997089
def stateStoreProviderClass: String = getConf(STATE_STORE_PROVIDER_CLASS)
71007090

7101-
def stateStoreCreateMetadataDirOnRead: Boolean =
7102-
getConf(STREAMING_CHECKPOINT_STATE_CREATE_METADATA_DIR_ON_READ)
7103-
71047091
def isStateSchemaCheckEnabled: Boolean = getConf(STATE_SCHEMA_CHECK_ENABLED)
71057092

71067093
def numStateStoreMaintenanceThreads: Int = getConf(NUM_STATE_STORE_MAINTENANCE_THREADS)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -378,9 +378,8 @@ class StateDataSource extends TableProvider with DataSourceRegister with Logging
378378
val storeId = new StateStoreId(stateCheckpointLocation.toString, sourceOptions.operatorId,
379379
partitionId, sourceOptions.storeName)
380380
val providerId = new StateStoreProviderId(storeId, UUID.randomUUID())
381-
val createSchemaDir = session.sessionState.conf.stateStoreCreateMetadataDirOnRead
382381
val manager = new StateSchemaCompatibilityChecker(providerId, hadoopConf,
383-
oldSchemaFilePaths = oldSchemaFilePaths, createSchemaDir = createSchemaDir)
382+
oldSchemaFilePaths = oldSchemaFilePaths, createSchemaDir = false)
384383
val stateSchema = manager.readSchemaFile()
385384

386385
if (sourceOptions.internalOnlyReadAllColumnFamilies) {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStateHelper.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,6 @@ object StreamStreamJoinStateHelper {
7979
// KeyToNumValuesType, KeyWithIndexToValueType
8080
val storeNames = SymmetricHashJoinStateManager.allStateStoreNames(side).toList
8181

82-
val createSchemaDir = session.sessionState.conf.stateStoreCreateMetadataDirOnRead
83-
8482
val (keySchema, valueSchema) =
8583
if (!usesVirtualColumnFamilies(
8684
newHadoopConf, stateCheckpointLocation, operatorId)) {
@@ -97,12 +95,12 @@ object StreamStreamJoinStateHelper {
9795
// read the key schema from the keyToNumValues store for the join keys
9896
val manager = new StateSchemaCompatibilityChecker(
9997
providerIdForKeyToNumValues, newHadoopConf, oldSchemaFilePaths,
100-
createSchemaDir = createSchemaDir)
98+
createSchemaDir = false)
10199
val kSchema = manager.readSchemaFile().head.keySchema
102100

103101
// read the value schema from the keyWithIndexToValue store for the values
104102
val manager2 = new StateSchemaCompatibilityChecker(providerIdForKeyWithIndexToValue,
105-
newHadoopConf, oldSchemaFilePaths, createSchemaDir = createSchemaDir)
103+
newHadoopConf, oldSchemaFilePaths, createSchemaDir = false)
106104
val vSchema = manager2.readSchemaFile().head.valueSchema
107105

108106
(kSchema, vSchema)
@@ -112,7 +110,7 @@ object StreamStreamJoinStateHelper {
112110
val providerId = new StateStoreProviderId(storeId, UUID.randomUUID())
113111

114112
val manager = new StateSchemaCompatibilityChecker(
115-
providerId, newHadoopConf, oldSchemaFilePaths, createSchemaDir = createSchemaDir)
113+
providerId, newHadoopConf, oldSchemaFilePaths, createSchemaDir = false)
116114
val kSchema = manager.readSchemaFile().find { schema =>
117115
schema.colFamilyName == storeNames(0)
118116
}.map(_.keySchema).get

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,11 +222,9 @@ class StateMetadataPartitionReader(
222222
} else {
223223
1
224224
}
225-
val createMetadataDir = SparkSession.getActiveSession
226-
.map(_.sessionState.conf.stateStoreCreateMetadataDirOnRead).getOrElse(false)
227225
OperatorStateMetadataReader.createReader(
228226
operatorIdPath, hadoopConf, operatorStateMetadataVersion, batchId,
229-
createMetadataDir = createMetadataDir).read() match {
227+
createMetadataDir = false).read() match {
230228
case Some(metadata) => metadata
231229
case None => throw StateDataSourceErrors.failedToReadOperatorMetadata(checkpointLocation,
232230
batchId)

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala

Lines changed: 2 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1507,8 +1507,8 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass
15071507
* Test suite that verifies the state data source reader does not create empty state
15081508
* directories when reading state for all stateful operators.
15091509
*
1510-
* When `spark.sql.streaming.stateStore.createMetadataDirOnRead` is false (the default),
1511-
* the reader should not call mkdirs on the schema metadata path. This is important for
1510+
* The reader does not create metadata directories (no mkdirs on the schema metadata path).
1511+
* This is important for
15121512
* Unity Catalog environments where creating directories requires WRITE FILES permission,
15131513
* but reading state should only require READ FILES permission (ES-1722614).
15141514
*
@@ -1667,35 +1667,4 @@ class StateDataSourceNoEmptyDirCreationSuite extends StateDataSourceTestBase {
16671667
)
16681668
}
16691669

1670-
test("createMetadataDirOnRead=true recreates deleted state directory") {
1671-
withSQLConf(
1672-
SQLConf.STREAMING_CHECKPOINT_STATE_CREATE_METADATA_DIR_ON_READ.key -> "true") {
1673-
withTempDir { tempDir =>
1674-
val checkpointPath = tempDir.getAbsolutePath
1675-
runLargeDataStreamingAggregationQuery(checkpointPath)
1676-
1677-
val stateDir = new File(tempDir, "state")
1678-
assert(stateDir.exists(), "State directory should exist after running the query")
1679-
Utils.deleteRecursively(stateDir)
1680-
assert(!stateDir.exists(), "State directory should be deleted")
1681-
1682-
// With createMetadataDirOnRead=true, the reader will attempt to create
1683-
// the _metadata directory, which recreates part of the state directory tree
1684-
val e5 = intercept[Exception] {
1685-
spark.read
1686-
.format("statestore")
1687-
.option(StateSourceOptions.PATH, checkpointPath)
1688-
.load()
1689-
.collect()
1690-
}
1691-
assertCauseChainContains(e5,
1692-
classOf[StateDataSourceReadStateSchemaFailure])
1693-
1694-
// The state directory should be recreated (at least partially) because
1695-
// createMetadataDirOnRead=true causes mkdirs on the schema metadata path
1696-
assert(stateDir.exists(),
1697-
"With createMetadataDirOnRead=true, state directory should be recreated")
1698-
}
1699-
}
1700-
}
17011670
}

0 commit comments

Comments
 (0)