Skip to content

Commit 3998186

Browse files
asl3gengliangwang
authored andcommitted
[SPARK-51747][SQL][FOLLOW-UP] Data source cached plan conf and migration guide
### What changes were proposed in this pull request? Follow-up to #50538. Add a SQL legacy conf to enable/disable the change to allow users to restore the previous behavior. Also add a migration guide note. ### Why are the changes needed? The original PR changes the behavior of reading from a data source file with options. The flag is needed to allow users a way to restore the former behavior, if desired. ### Does this PR introduce _any_ user-facing change? No (original PR was a user-facing change, but this PR simply adds a config). ### How was this patch tested? Added test for the config ### Was this patch authored or co-authored using generative AI tooling? No Closes #50571 from asl3/asl3/filedatasourcecache-docsconf. Authored-by: Amanda Liu <amanda.liu@databricks.com> Signed-off-by: Gengliang Wang <gengliang@apache.org>
1 parent 467644e commit 3998186

File tree

5 files changed

+102
-29
lines changed

5 files changed

+102
-29
lines changed

docs/sql-migration-guide.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ license: |
6464
- Since Spark 4.0, Views allow control over how they react to underlying query changes. By default views tolerate column type changes in the query and compensate with casts. To disable this feature set `spark.sql.legacy.viewSchemaBindingMode` to `false`. This also removes the clause from `DESCRIBE EXTENDED` and `SHOW CREATE TABLE`.
6565
- Since Spark 4.0, The Storage-Partitioned Join feature flag `spark.sql.sources.v2.bucketing.pushPartValues.enabled` is set to `true`. To restore the previous behavior, set `spark.sql.sources.v2.bucketing.pushPartValues.enabled` to `false`.
6666
- Since Spark 4.0, the `sentences` function uses `Locale(language)` instead of `Locale.US` when `language` parameter is not `NULL` and `country` parameter is `NULL`.
67+
- Since Spark 4.0, reading from a file source table will correctly respect query options, e.g. delimiters. Previously, the first query plan was cached and subsequent option changes ignored. To restore the previous behavior, set `spark.sql.legacy.readFileSourceTableCacheIgnoreOptions` to `true`.
6768

6869
## Upgrading from Spark SQL 3.5.3 to 3.5.4
6970

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5269,6 +5269,16 @@ object SQLConf {
52695269
.booleanConf
52705270
.createWithDefault(false)
52715271

5272+
val READ_FILE_SOURCE_TABLE_CACHE_IGNORE_OPTIONS =
5273+
buildConf("spark.sql.legacy.readFileSourceTableCacheIgnoreOptions")
5274+
.internal()
5275+
.doc("When set to true, reading from file source table caches the first query plan and " +
5276+
"ignores subsequent changes in query options. Otherwise, query options will be applied " +
5277+
"to the cached plan and may produce different results.")
5278+
.version("4.0.0")
5279+
.booleanConf
5280+
.createWithDefault(false)
5281+
52725282
val READ_SIDE_CHAR_PADDING = buildConf("spark.sql.readSideCharPadding")
52735283
.doc("When true, Spark applies string padding when reading CHAR type columns/fields, " +
52745284
"in addition to the write-side padding. This config is true by default to better enforce " +

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,8 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
257257
QualifiedTableName(table.identifier.catalog.get, table.database, table.identifier.table)
258258
val catalog = sparkSession.sessionState.catalog
259259
val dsOptions = DataSourceUtils.generateDatasourceOptions(extraOptions, table)
260+
val readFileSourceTableCacheIgnoreOptions =
261+
SQLConf.get.getConf(SQLConf.READ_FILE_SOURCE_TABLE_CACHE_IGNORE_OPTIONS)
260262
catalog.getCachedTable(qualifiedTableName) match {
261263
case null =>
262264
val dataSource =
@@ -274,13 +276,16 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
274276
catalog.cacheTable(qualifiedTableName, plan)
275277
plan
276278

277-
// If the cached table relation's options differ from the new options:
279+
// If readFileSourceTableCacheIgnoreOptions is false AND
280+
// the cached table relation's options differ from the new options:
278281
// 1. Create a new HadoopFsRelation with updated options
279282
// 2. Return a new LogicalRelation with the updated HadoopFsRelation
280-
// This ensures the relation reflects any changes in data source options
283+
// This ensures the relation reflects any changes in data source options.
284+
// Otherwise, leave the cached table relation as is
281285
case r @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _, _)
282-
if new CaseInsensitiveStringMap(fsRelation.options.asJava) !=
283-
new CaseInsensitiveStringMap(dsOptions.asJava) =>
286+
if !readFileSourceTableCacheIgnoreOptions &&
287+
(new CaseInsensitiveStringMap(fsRelation.options.asJava) !=
288+
new CaseInsensitiveStringMap(dsOptions.asJava)) =>
284289
val newFsRelation = fsRelation.copy(options = dsOptions)(sparkSession)
285290
r.copy(relation = newFsRelation)
286291

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 76 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
3535
import org.apache.spark.sql.connector.catalog.CatalogManager
3636
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
3737
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
38+
import org.apache.spark.sql.execution.datasources._
3839
import org.apache.spark.sql.internal.SQLConf
3940
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
4041
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
@@ -1376,32 +1377,84 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase {
13761377
}
13771378
}
13781379

1379-
test("SPARK-51747: Data source cached plan should respect options") {
1380-
withTable("t") {
1381-
spark.sql("CREATE TABLE t(a string, b string) USING CSV".stripMargin)
1382-
spark.sql("INSERT INTO TABLE t VALUES ('a;b', 'c')")
1383-
spark.sql("INSERT INTO TABLE t VALUES ('hello; world', 'test')")
1384-
1385-
// check initial contents of table
1386-
checkAnswer(spark.table("t"), Row("a;b", "c") :: Row("hello; world", "test") :: Nil)
1380+
test("SPARK-51747: Data source cached plan respects options if ignore conf disabled") {
1381+
val catalog = spark.sessionState.catalog
13871382

1388-
// no option
1389-
checkAnswer(
1390-
spark.sql("SELECT * FROM t"),
1391-
Row("a;b", "c") :: Row("hello; world", "test") :: Nil
1392-
)
1383+
// util to get cached table plan options
1384+
def getCachedTableOptions(
1385+
qualifiedTableName: QualifiedTableName): Map[String, String] = {
1386+
catalog.getCachedTable(qualifiedTableName) match {
1387+
case LogicalRelation(fsRelation: HadoopFsRelation, _, _, _, _) => fsRelation.options
1388+
}
1389+
}
13931390

1394-
// respect delimiter option
1395-
checkAnswer(
1396-
spark.sql("SELECT * FROM t WITH ('delimiter' = ';')"),
1397-
Row("a", "b,c") :: Row("hello", " world,test") :: Nil
1398-
)
1391+
Seq(true, false).foreach { ignoreOption =>
1392+
withSQLConf(
1393+
SQLConf.READ_FILE_SOURCE_TABLE_CACHE_IGNORE_OPTIONS.key -> ignoreOption.toString) {
1394+
withNamespace("ns") {
1395+
withTable("t") {
1396+
spark.sql(("CREATE TABLE t(a string, b string) " +
1397+
"USING CSV OPTIONS (maxColumns 500)").stripMargin)
1398+
spark.sql("INSERT INTO TABLE t VALUES ('a;b', 'c')")
1399+
spark.sql("INSERT INTO TABLE t VALUES ('hello; world', 'test')")
1400+
1401+
// check initial contents of table
1402+
val resultNoOptions = Row("a;b", "c") :: Row("hello; world", "test") :: Nil
1403+
checkAnswer(spark.table("t"), resultNoOptions)
1404+
1405+
// check cached plan contains create table options
1406+
val qualifiedTableName = QualifiedTableName(
1407+
CatalogManager.SESSION_CATALOG_NAME, catalog.getCurrentDatabase, "t")
1408+
val pathOption = catalog.getTableMetadata(TableIdentifier("t"))
1409+
.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
1410+
val createTableOptions: Map[String, String] = Map("maxcolumns" -> "500") ++ pathOption
1411+
assert(getCachedTableOptions(qualifiedTableName) == createTableOptions)
1412+
1413+
// delimiter ; option
1414+
val expectedResultDelimiter =
1415+
if (ignoreOption) {
1416+
resultNoOptions
1417+
} else {
1418+
Row("a", "b,c") :: Row("hello", " world,test") :: Nil
1419+
}
1420+
checkAnswer(
1421+
spark.sql("SELECT * FROM t WITH ('delimiter' = ';')"),
1422+
expectedResultDelimiter
1423+
)
1424+
checkAnswer(
1425+
spark.read.option("delimiter", ";").table("t"), // scala API test
1426+
expectedResultDelimiter
1427+
)
1428+
// cached plan should still only contain create table options
1429+
assert(getCachedTableOptions(qualifiedTableName) == createTableOptions)
13991430

1400-
// respect lineSep option
1401-
checkAnswer(
1402-
spark.sql("SELECT * FROM t WITH ('lineSep' = ';')"),
1403-
Row("a", null) :: Row("b", "c\n") :: Row("hello", null) :: Row(" world", "test\n") :: Nil
1404-
)
1431+
// no option
1432+
checkAnswer(
1433+
spark.sql("SELECT * FROM t"),
1434+
resultNoOptions
1435+
)
1436+
assert(getCachedTableOptions(qualifiedTableName) == createTableOptions)
1437+
1438+
// lineSep ; option
1439+
val expectedResultLineSep =
1440+
if (ignoreOption) {
1441+
resultNoOptions
1442+
} else {
1443+
Row("a", null) :: Row("b", "c\n") :: Row("hello", null) ::
1444+
Row(" world", "test\n") :: Nil
1445+
}
1446+
checkAnswer(
1447+
spark.sql("SELECT * FROM t WITH ('lineSep' = ';')"),
1448+
expectedResultLineSep
1449+
)
1450+
checkAnswer(
1451+
spark.read.option("lineSep", ";").table("t"), // scala API test
1452+
expectedResultLineSep
1453+
)
1454+
assert(getCachedTableOptions(qualifiedTableName) == createTableOptions)
1455+
}
1456+
}
1457+
}
14051458
}
14061459
}
14071460

sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateWithInitialStateSuite.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -420,8 +420,12 @@ class FlatMapGroupsWithStateWithInitialStateSuite extends StateStoreMetricsTest
420420
Seq(true, false).foreach { skipEmittingInitialStateKeys =>
421421
testWithAllStateVersions("flatMapGroupsWithState - initial state and initial batch " +
422422
s"have same keys and skipEmittingInitialStateKeys=$skipEmittingInitialStateKeys") {
423-
withSQLConf(SQLConf.FLATMAPGROUPSWITHSTATE_SKIP_EMITTING_INITIAL_STATE_KEYS.key ->
424-
skipEmittingInitialStateKeys.toString) {
423+
withSQLConf(
424+
SQLConf.FLATMAPGROUPSWITHSTATE_SKIP_EMITTING_INITIAL_STATE_KEYS.key ->
425+
skipEmittingInitialStateKeys.toString,
426+
// restore behavior before SPARK-51747
427+
SQLConf.READ_FILE_SOURCE_TABLE_CACHE_IGNORE_OPTIONS.key -> "true"
428+
) {
425429
val initialState = Seq(
426430
("apple", 1L),
427431
("orange", 2L)).toDS().groupByKey(_._1).mapValues(_._2)

0 commit comments

Comments
 (0)