Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 6273a71

Browse files
jmchunggatorsmile
authored andcommitted
[SPARK-21610][SQL] Corrupt records are not handled properly when creating a dataframe from a file
## What changes were proposed in this pull request? ``` echo '{"field": 1} {"field": 2} {"field": "3"}' >/tmp/sample.json ``` ```scala import org.apache.spark.sql.types._ val schema = new StructType() .add("field", ByteType) .add("_corrupt_record", StringType) val file = "/tmp/sample.json" val dfFromFile = spark.read.schema(schema).json(file) scala> dfFromFile.show(false) +-----+---------------+ |field|_corrupt_record| +-----+---------------+ |1 |null | |2 |null | |null |{"field": "3"} | +-----+---------------+ scala> dfFromFile.filter($"_corrupt_record".isNotNull).count() res1: Long = 0 scala> dfFromFile.filter($"_corrupt_record".isNull).count() res2: Long = 3 ``` When the `requiredSchema` only contains `_corrupt_record`, the derived `actualSchema` is empty and the `_corrupt_record` are all null for all rows. This PR captures above situation and raise an exception with a reasonable workaround messag so that users can know what happened and how to fix the query. ## How was this patch tested? Added test case. Author: Jen-Ming Chung <[email protected]> Closes apache#18865 from jmchung/SPARK-21610.
1 parent 520d92a commit 6273a71

File tree

3 files changed

+47
-0
lines changed

3 files changed

+47
-0
lines changed

docs/sql-programming-guide.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1543,6 +1543,10 @@ options.
15431543

15441544
# Migration Guide
15451545

1546+
## Upgrading From Spark SQL 2.2 to 2.3
1547+
1548+
- Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named `_corrupt_record` by default). For example, `spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()` and `spark.read.schema(schema).json(file).select("_corrupt_record").show()`. Instead, you can cache or save the parsed results and then send the same query. For example, `val df = spark.read.schema(schema).json(file).cache()` and then `df.filter($"_corrupt_record".isNotNull).count()`.
1549+
15461550
## Upgrading From Spark SQL 2.1 to 2.2
15471551

15481552
- Spark 2.1.1 introduced a new configuration key: `spark.sql.hive.caseSensitiveInferenceMode`. It had a default setting of `NEVER_INFER`, which kept behavior identical to 2.1.0. However, Spark 2.2.0 changes this setting's default value to `INFER_AND_SAVE` to restore compatibility with reading Hive metastore tables whose underlying file schema have mixed-case column names. With the `INFER_AND_SAVE` configuration value, on first access Spark will perform schema inference on any Hive metastore table for which it has not already saved an inferred schema. Note that schema inference can be a very time consuming operation for tables with thousands of partitions. If compatibility with mixed-case column names is not a concern, you can safely set `spark.sql.hive.caseSensitiveInferenceMode` to `NEVER_INFER` to avoid the initial overhead of schema inference. Note that with the new default `INFER_AND_SAVE` setting, the results of the schema inference are saved as a metastore key for future use. Therefore, the initial schema inference occurs only at a table's first access.

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,20 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
113113
}
114114
}
115115

116+
if (requiredSchema.length == 1 &&
117+
requiredSchema.head.name == parsedOptions.columnNameOfCorruptRecord) {
118+
throw new AnalysisException(
119+
"Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the\n" +
120+
"referenced columns only include the internal corrupt record column\n" +
121+
s"(named ${parsedOptions.columnNameOfCorruptRecord} by default). For example:\n" +
122+
"spark.read.schema(schema).json(file).filter($\"_corrupt_record\".isNotNull).count()\n" +
123+
"and spark.read.schema(schema).json(file).select(\"_corrupt_record\").show().\n" +
124+
"Instead, you can cache or save the parsed results and then send the same query.\n" +
125+
"For example, val df = spark.read.schema(schema).json(file).cache() and then\n" +
126+
"df.filter($\"_corrupt_record\".isNotNull).count()."
127+
)
128+
}
129+
116130
(file: PartitionedFile) => {
117131
val parser = new JacksonParser(actualSchema, parsedOptions)
118132
JsonDataSource(parsedOptions).readFile(

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2034,4 +2034,33 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
20342034
}
20352035
}
20362036
}
2037+
2038+
test("SPARK-21610: Corrupt records are not handled properly when creating a dataframe " +
2039+
"from a file") {
2040+
withTempPath { dir =>
2041+
val path = dir.getCanonicalPath
2042+
val data =
2043+
"""{"field": 1}
2044+
|{"field": 2}
2045+
|{"field": "3"}""".stripMargin
2046+
Seq(data).toDF().repartition(1).write.text(path)
2047+
val schema = new StructType().add("field", ByteType).add("_corrupt_record", StringType)
2048+
// negative cases
2049+
val msg = intercept[AnalysisException] {
2050+
spark.read.schema(schema).json(path).select("_corrupt_record").collect()
2051+
}.getMessage
2052+
assert(msg.contains("only include the internal corrupt record column"))
2053+
intercept[catalyst.errors.TreeNodeException[_]] {
2054+
spark.read.schema(schema).json(path).filter($"_corrupt_record".isNotNull).count()
2055+
}
2056+
// workaround
2057+
val df = spark.read.schema(schema).json(path).cache()
2058+
assert(df.filter($"_corrupt_record".isNotNull).count() == 1)
2059+
assert(df.filter($"_corrupt_record".isNull).count() == 2)
2060+
checkAnswer(
2061+
df.select("_corrupt_record"),
2062+
Row(null) :: Row(null) :: Row("{\"field\": \"3\"}") :: Nil
2063+
)
2064+
}
2065+
}
20372066
}

0 commit comments

Comments
 (0)