Skip to content

Commit 76813cf

Browse files
MaxGekkHyukjinKwon
authored andcommitted
[SPARK-25950][SQL] from_csv should respect to spark.sql.columnNameOfCorruptRecord
## What changes were proposed in this pull request? Fix for `CsvToStructs` to take into account SQL config `spark.sql.columnNameOfCorruptRecord` similar to `from_json`. ## How was this patch tested? Added new test where `spark.sql.columnNameOfCorruptRecord` is set to corrupt column name different from default. Closes apache#22956 from MaxGekk/csv-tests. Authored-by: Maxim Gekk <[email protected]> Signed-off-by: hyukjinkwon <[email protected]>
1 parent 63ca4bb commit 76813cf

File tree

2 files changed

+39
-1
lines changed

2 files changed

+39
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
2727
import org.apache.spark.sql.catalyst.csv._
2828
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
2929
import org.apache.spark.sql.catalyst.util._
30+
import org.apache.spark.sql.internal.SQLConf
3031
import org.apache.spark.sql.types._
3132
import org.apache.spark.unsafe.types.UTF8String
3233

@@ -92,8 +93,14 @@ case class CsvToStructs(
9293
}
9394
}
9495

96+
val nameOfCorruptRecord = SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD)
97+
9598
@transient lazy val parser = {
96-
val parsedOptions = new CSVOptions(options, columnPruning = true, timeZoneId.get)
99+
val parsedOptions = new CSVOptions(
100+
options,
101+
columnPruning = true,
102+
defaultTimeZoneId = timeZoneId.get,
103+
defaultColumnNameOfCorruptRecord = nameOfCorruptRecord)
97104
val mode = parsedOptions.parseMode
98105
if (mode != PermissiveMode && mode != FailFastMode) {
99106
throw new AnalysisException(s"from_csv() doesn't support the ${mode.name} mode. " +

sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ package org.apache.spark.sql
1919

2020
import scala.collection.JavaConverters._
2121

22+
import org.apache.spark.SparkException
2223
import org.apache.spark.sql.functions._
24+
import org.apache.spark.sql.internal.SQLConf
2325
import org.apache.spark.sql.test.SharedSQLContext
2426
import org.apache.spark.sql.types._
2527

@@ -86,4 +88,33 @@ class CsvFunctionsSuite extends QueryTest with SharedSQLContext {
8688

8789
checkAnswer(df.select(to_csv($"a", options)), Row("26/08/2015 18:00") :: Nil)
8890
}
91+
92+
test("from_csv invalid csv - check modes") {
93+
withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
94+
val schema = new StructType()
95+
.add("a", IntegerType)
96+
.add("b", IntegerType)
97+
.add("_unparsed", StringType)
98+
val badRec = "\""
99+
val df = Seq(badRec, "2,12").toDS()
100+
101+
checkAnswer(
102+
df.select(from_csv($"value", schema, Map("mode" -> "PERMISSIVE"))),
103+
Row(Row(null, null, badRec)) :: Row(Row(2, 12, null)) :: Nil)
104+
105+
val exception1 = intercept[SparkException] {
106+
df.select(from_csv($"value", schema, Map("mode" -> "FAILFAST"))).collect()
107+
}.getMessage
108+
assert(exception1.contains(
109+
"Malformed records are detected in record parsing. Parse Mode: FAILFAST."))
110+
111+
val exception2 = intercept[SparkException] {
112+
df.select(from_csv($"value", schema, Map("mode" -> "DROPMALFORMED")))
113+
.collect()
114+
}.getMessage
115+
assert(exception2.contains(
116+
"from_csv() doesn't support the DROPMALFORMED mode. " +
117+
"Acceptable modes are PERMISSIVE and FAILFAST."))
118+
}
119+
}
89120
}

0 commit comments

Comments
 (0)