Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 7d0a3ef

Browse files
jmchungHyukjinKwon
authored andcommitted
[SPARK-21610][SQL][FOLLOWUP] Corrupt records are not handled properly when creating a dataframe from a file
## What changes were proposed in this pull request? When the `requiredSchema` only contains `_corrupt_record`, the derived `actualSchema` is empty and the `_corrupt_record` are all null for all rows. This PR captures above situation and raise an exception with a reasonable workaround messag so that users can know what happened and how to fix the query. ## How was this patch tested? Added unit test in `CSVSuite`. Author: Jen-Ming Chung <[email protected]> Closes apache#19199 from jmchung/SPARK-21610-FOLLOWUP.
1 parent dd78167 commit 7d0a3ef

File tree

3 files changed

+57
-1
lines changed

3 files changed

+57
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,20 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
109109
}
110110
}
111111

112+
if (requiredSchema.length == 1 &&
113+
requiredSchema.head.name == parsedOptions.columnNameOfCorruptRecord) {
114+
throw new AnalysisException(
115+
"Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the\n" +
116+
"referenced columns only include the internal corrupt record column\n" +
117+
s"(named _corrupt_record by default). For example:\n" +
118+
"spark.read.schema(schema).csv(file).filter($\"_corrupt_record\".isNotNull).count()\n" +
119+
"and spark.read.schema(schema).csv(file).select(\"_corrupt_record\").show().\n" +
120+
"Instead, you can cache or save the parsed results and then send the same query.\n" +
121+
"For example, val df = spark.read.schema(schema).csv(file).cache() and then\n" +
122+
"df.filter($\"_corrupt_record\".isNotNull).count()."
123+
)
124+
}
125+
112126
(file: PartitionedFile) => {
113127
val conf = broadcastedHadoopConf.value.value
114128
val parser = new UnivocityParser(

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
118118
throw new AnalysisException(
119119
"Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the\n" +
120120
"referenced columns only include the internal corrupt record column\n" +
121-
s"(named ${parsedOptions.columnNameOfCorruptRecord} by default). For example:\n" +
121+
s"(named _corrupt_record by default). For example:\n" +
122122
"spark.read.schema(schema).json(file).filter($\"_corrupt_record\".isNotNull).count()\n" +
123123
"and spark.read.schema(schema).json(file).select(\"_corrupt_record\").show().\n" +
124124
"Instead, you can cache or save the parsed results and then send the same query.\n" +

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1203,4 +1203,46 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
12031203
.csv(Seq("a").toDS())
12041204
checkAnswer(df, Row("a", null, "a"))
12051205
}
1206+
1207+
test("SPARK-21610: Corrupt records are not handled properly when creating a dataframe " +
1208+
"from a file") {
1209+
val columnNameOfCorruptRecord = "_corrupt_record"
1210+
val schema = new StructType()
1211+
.add("a", IntegerType)
1212+
.add("b", TimestampType)
1213+
.add(columnNameOfCorruptRecord, StringType)
1214+
// negative cases
1215+
val msg = intercept[AnalysisException] {
1216+
spark
1217+
.read
1218+
.option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
1219+
.schema(schema)
1220+
.csv(testFile(valueMalformedFile))
1221+
.select(columnNameOfCorruptRecord)
1222+
.collect()
1223+
}.getMessage
1224+
assert(msg.contains("only include the internal corrupt record column"))
1225+
intercept[org.apache.spark.sql.catalyst.errors.TreeNodeException[_]] {
1226+
spark
1227+
.read
1228+
.option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
1229+
.schema(schema)
1230+
.csv(testFile(valueMalformedFile))
1231+
.filter($"_corrupt_record".isNotNull)
1232+
.count()
1233+
}
1234+
// workaround
1235+
val df = spark
1236+
.read
1237+
.option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
1238+
.schema(schema)
1239+
.csv(testFile(valueMalformedFile))
1240+
.cache()
1241+
assert(df.filter($"_corrupt_record".isNotNull).count() == 1)
1242+
assert(df.filter($"_corrupt_record".isNull).count() == 1)
1243+
checkAnswer(
1244+
df.select(columnNameOfCorruptRecord),
1245+
Row("0,2013-111-11 12:13:14") :: Row(null) :: Nil
1246+
)
1247+
}
12061248
}

0 commit comments

Comments
 (0)