Skip to content

Commit f90cfe6

Browse files
MaxGekkJackey Lee
authored andcommitted
[SPARK-26384][SQL] Propagate SQL configs for CSV schema inferring
## What changes were proposed in this pull request? Currently, SQL configs are not propagated to executors while schema inferring in CSV datasource. For example, changing of `spark.sql.legacy.timeParser.enabled` does not impact on inferring timestamp types. In the PR, I propose to fix the issue by wrapping schema inferring action using `SQLExecution.withSQLConfPropagated`. ## How was this patch tested? Added logging to `TimestampFormatter`: ```patch -object TimestampFormatter { +object TimestampFormatter extends Logging { def apply(format: String, timeZone: TimeZone, locale: Locale): TimestampFormatter = { if (SQLConf.get.legacyTimeParserEnabled) { + logError("LegacyFallbackTimestampFormatter is being used") new LegacyFallbackTimestampFormatter(format, timeZone, locale) } else { + logError("Iso8601TimestampFormatter is being used") new Iso8601TimestampFormatter(format, timeZone, locale) } } ``` and run the command in `spark-shell`: ```shell $ ./bin/spark-shell --conf spark.sql.legacy.timeParser.enabled=true ``` ```scala scala> Seq("2010|10|10").toDF.repartition(1).write.mode("overwrite").text("/tmp/foo") scala> spark.read.option("inferSchema", "true").option("header", "false").option("timestampFormat", "yyyy|MM|dd").csv("/tmp/foo").printSchema() 18/12/18 10:47:27 ERROR TimestampFormatter: LegacyFallbackTimestampFormatter is being used root |-- _c0: timestamp (nullable = true) ``` Closes apache#23345 from MaxGekk/csv-schema-infer-propagate-configs. Authored-by: Maxim Gekk <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent 3f1927f commit f90cfe6

File tree

1 file changed

+7
-2
lines changed

1 file changed

+7
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.spark.rdd.{BinaryFileRDD, RDD}
3535
import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
3636
import org.apache.spark.sql.catalyst.InternalRow
3737
import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVInferSchema, CSVOptions, UnivocityParser}
38+
import org.apache.spark.sql.execution.SQLExecution
3839
import org.apache.spark.sql.execution.datasources._
3940
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
4041
import org.apache.spark.sql.types.StructType
@@ -135,7 +136,9 @@ object TextInputCSVDataSource extends CSVDataSource {
135136
val parser = new CsvParser(parsedOptions.asParserSettings)
136137
linesWithoutHeader.map(parser.parseLine)
137138
}
138-
new CSVInferSchema(parsedOptions).infer(tokenRDD, header)
139+
SQLExecution.withSQLConfPropagated(csv.sparkSession) {
140+
new CSVInferSchema(parsedOptions).infer(tokenRDD, header)
141+
}
139142
case _ =>
140143
// If the first line could not be read, just return the empty schema.
141144
StructType(Nil)
@@ -208,7 +211,9 @@ object MultiLineCSVDataSource extends CSVDataSource {
208211
encoding = parsedOptions.charset)
209212
}
210213
val sampled = CSVUtils.sample(tokenRDD, parsedOptions)
211-
new CSVInferSchema(parsedOptions).infer(sampled, header)
214+
SQLExecution.withSQLConfPropagated(sparkSession) {
215+
new CSVInferSchema(parsedOptions).infer(sampled, header)
216+
}
212217
case None =>
213218
// If the first row could not be read, just return the empty schema.
214219
StructType(Nil)

0 commit comments

Comments
 (0)