Skip to content

Commit c7e2742

Browse files
MaxGekkgatorsmile
authored andcommitted
[SPARK-24190][SQL] Allow saving of JSON files in UTF-16 and UTF-32
## What changes were proposed in this pull request? Currently, restrictions in JSONOptions for `encoding` and `lineSep` are the same for read and for write. For example, a requirement for `lineSep` in the code: ``` df.write.option("encoding", "UTF-32BE").json(file) ``` doesn't allow to skip `lineSep` and use its default value `\n` because it throws the exception: ``` equirement failed: The lineSep option must be specified for the UTF-32BE encoding java.lang.IllegalArgumentException: requirement failed: The lineSep option must be specified for the UTF-32BE encoding ``` In the PR, I propose to separate JSONOptions in read and write, and make JSONOptions in write less restrictive. ## How was this patch tested? Added new test for blacklisted encodings in read. And the `lineSep` option was removed in write for some tests. Author: Maxim Gekk <[email protected]> Author: Maxim Gekk <[email protected]> Closes apache#21247 from MaxGekk/json-options-in-write.
1 parent 4e7d867 commit c7e2742

File tree

3 files changed

+95
-45
lines changed

3 files changed

+95
-45
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala

Lines changed: 49 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -97,32 +97,16 @@ private[sql] class JSONOptions(
9797
sep
9898
}
9999

100+
protected def checkedEncoding(enc: String): String = enc
101+
100102
/**
101103
* Standard encoding (charset) name. For example UTF-8, UTF-16LE and UTF-32BE.
102-
* If the encoding is not specified (None), it will be detected automatically
103-
* when the multiLine option is set to `true`.
104+
* If the encoding is not specified (None) in read, it will be detected automatically
105+
* when the multiLine option is set to `true`. If encoding is not specified in write,
106+
* UTF-8 is used by default.
104107
*/
105108
val encoding: Option[String] = parameters.get("encoding")
106-
.orElse(parameters.get("charset")).map { enc =>
107-
// The following encodings are not supported in per-line mode (multiline is false)
108-
// because they cause some problems in reading files with BOM which is supposed to
109-
// present in the files with such encodings. After splitting input files by lines,
110-
// only the first lines will have the BOM which leads to impossibility for reading
111-
// the rest lines. Besides of that, the lineSep option must have the BOM in such
112-
// encodings which can never present between lines.
113-
val blacklist = Seq(Charset.forName("UTF-16"), Charset.forName("UTF-32"))
114-
val isBlacklisted = blacklist.contains(Charset.forName(enc))
115-
require(multiLine || !isBlacklisted,
116-
s"""The $enc encoding in the blacklist is not allowed when multiLine is disabled.
117-
|Blacklist: ${blacklist.mkString(", ")}""".stripMargin)
118-
119-
val isLineSepRequired =
120-
multiLine || Charset.forName(enc) == StandardCharsets.UTF_8 || lineSeparator.nonEmpty
121-
122-
require(isLineSepRequired, s"The lineSep option must be specified for the $enc encoding")
123-
124-
enc
125-
}
109+
.orElse(parameters.get("charset")).map(checkedEncoding)
126110

127111
val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.map { lineSep =>
128112
lineSep.getBytes(encoding.getOrElse("UTF-8"))
@@ -141,3 +125,46 @@ private[sql] class JSONOptions(
141125
factory.configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, allowUnquotedControlChars)
142126
}
143127
}
128+
129+
private[sql] class JSONOptionsInRead(
130+
@transient override val parameters: CaseInsensitiveMap[String],
131+
defaultTimeZoneId: String,
132+
defaultColumnNameOfCorruptRecord: String)
133+
extends JSONOptions(parameters, defaultTimeZoneId, defaultColumnNameOfCorruptRecord) {
134+
135+
def this(
136+
parameters: Map[String, String],
137+
defaultTimeZoneId: String,
138+
defaultColumnNameOfCorruptRecord: String = "") = {
139+
this(
140+
CaseInsensitiveMap(parameters),
141+
defaultTimeZoneId,
142+
defaultColumnNameOfCorruptRecord)
143+
}
144+
145+
protected override def checkedEncoding(enc: String): String = {
146+
val isBlacklisted = JSONOptionsInRead.blacklist.contains(Charset.forName(enc))
147+
require(multiLine || !isBlacklisted,
148+
s"""The ${enc} encoding must not be included in the blacklist when multiLine is disabled:
149+
|Blacklist: ${JSONOptionsInRead.blacklist.mkString(", ")}""".stripMargin)
150+
151+
val isLineSepRequired =
152+
multiLine || Charset.forName(enc) == StandardCharsets.UTF_8 || lineSeparator.nonEmpty
153+
require(isLineSepRequired, s"The lineSep option must be specified for the $enc encoding")
154+
155+
enc
156+
}
157+
}
158+
159+
private[sql] object JSONOptionsInRead {
160+
// The following encodings are not supported in per-line mode (multiline is false)
161+
// because they cause some problems in reading files with BOM which is supposed to
162+
// present in the files with such encodings. After splitting input files by lines,
163+
// only the first lines will have the BOM which leads to impossibility for reading
164+
// the rest lines. Besides of that, the lineSep option must have the BOM in such
165+
// encodings which can never present between lines.
166+
val blacklist = Seq(
167+
Charset.forName("UTF-16"),
168+
Charset.forName("UTF-32")
169+
)
170+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
2626
import org.apache.spark.internal.Logging
2727
import org.apache.spark.sql.{AnalysisException, SparkSession}
2828
import org.apache.spark.sql.catalyst.InternalRow
29-
import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JacksonParser, JSONOptions}
29+
import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JacksonParser, JSONOptions, JSONOptionsInRead}
3030
import org.apache.spark.sql.catalyst.util.CompressionCodecs
3131
import org.apache.spark.sql.execution.datasources._
3232
import org.apache.spark.sql.sources._
@@ -40,7 +40,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
4040
sparkSession: SparkSession,
4141
options: Map[String, String],
4242
path: Path): Boolean = {
43-
val parsedOptions = new JSONOptions(
43+
val parsedOptions = new JSONOptionsInRead(
4444
options,
4545
sparkSession.sessionState.conf.sessionLocalTimeZone,
4646
sparkSession.sessionState.conf.columnNameOfCorruptRecord)
@@ -52,7 +52,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
5252
sparkSession: SparkSession,
5353
options: Map[String, String],
5454
files: Seq[FileStatus]): Option[StructType] = {
55-
val parsedOptions = new JSONOptions(
55+
val parsedOptions = new JSONOptionsInRead(
5656
options,
5757
sparkSession.sessionState.conf.sessionLocalTimeZone,
5858
sparkSession.sessionState.conf.columnNameOfCorruptRecord)
@@ -99,7 +99,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
9999
val broadcastedHadoopConf =
100100
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
101101

102-
val parsedOptions = new JSONOptions(
102+
val parsedOptions = new JSONOptionsInRead(
103103
options,
104104
sparkSession.sessionState.conf.sessionLocalTimeZone,
105105
sparkSession.sessionState.conf.columnNameOfCorruptRecord)
@@ -158,6 +158,11 @@ private[json] class JsonOutputWriter(
158158
case None => StandardCharsets.UTF_8
159159
}
160160

161+
if (JSONOptionsInRead.blacklist.contains(encoding)) {
162+
logWarning(s"The JSON file ($path) was written in the encoding ${encoding.displayName()}" +
163+
" which can be read back by Spark only if multiLine is enabled.")
164+
}
165+
161166
private val writer = CodecStreams.createOutputStreamWriter(
162167
context, new Path(path), encoding)
163168

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717

1818
package org.apache.spark.sql.execution.datasources.json
1919

20-
import java.io.{File, FileOutputStream, StringWriter}
21-
import java.nio.charset.{StandardCharsets, UnsupportedCharsetException}
22-
import java.nio.file.{Files, Paths, StandardOpenOption}
20+
import java.io._
21+
import java.nio.charset.{Charset, StandardCharsets, UnsupportedCharsetException}
22+
import java.nio.file.Files
2323
import java.sql.{Date, Timestamp}
2424
import java.util.Locale
2525

@@ -2262,7 +2262,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
22622262
withTempPath { path =>
22632263
val df = spark.createDataset(Seq(("Dog", 42)))
22642264
df.write
2265-
.options(Map("encoding" -> encoding, "lineSep" -> "\n"))
2265+
.options(Map("encoding" -> encoding))
22662266
.json(path.getCanonicalPath)
22672267

22682268
checkEncoding(
@@ -2286,16 +2286,22 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
22862286

22872287
test("SPARK-23723: wrong output encoding") {
22882288
val encoding = "UTF-128"
2289-
val exception = intercept[UnsupportedCharsetException] {
2289+
val exception = intercept[SparkException] {
22902290
withTempPath { path =>
22912291
val df = spark.createDataset(Seq((0)))
22922292
df.write
2293-
.options(Map("encoding" -> encoding, "lineSep" -> "\n"))
2293+
.options(Map("encoding" -> encoding))
22942294
.json(path.getCanonicalPath)
22952295
}
22962296
}
22972297

2298-
assert(exception.getMessage == encoding)
2298+
val baos = new ByteArrayOutputStream()
2299+
val ps = new PrintStream(baos, true, "UTF-8")
2300+
exception.printStackTrace(ps)
2301+
ps.flush()
2302+
2303+
assert(baos.toString.contains(
2304+
"java.nio.charset.UnsupportedCharsetException: UTF-128"))
22992305
}
23002306

23012307
test("SPARK-23723: read back json in UTF-16LE") {
@@ -2316,18 +2322,17 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
23162322
test("SPARK-23723: write json in UTF-16/32 with multiline off") {
23172323
Seq("UTF-16", "UTF-32").foreach { encoding =>
23182324
withTempPath { path =>
2319-
val ds = spark.createDataset(Seq(
2320-
("a", 1), ("b", 2), ("c", 3))
2321-
).repartition(2)
2322-
val e = intercept[IllegalArgumentException] {
2323-
ds.write
2324-
.option("encoding", encoding)
2325-
.option("multiline", "false")
2326-
.format("json").mode("overwrite")
2327-
.save(path.getCanonicalPath)
2328-
}.getMessage
2329-
assert(e.contains(
2330-
s"$encoding encoding in the blacklist is not allowed when multiLine is disabled"))
2325+
val ds = spark.createDataset(Seq(("a", 1))).repartition(1)
2326+
ds.write
2327+
.option("encoding", encoding)
2328+
.option("multiline", false)
2329+
.json(path.getCanonicalPath)
2330+
val jsonFiles = path.listFiles().filter(_.getName.endsWith("json"))
2331+
jsonFiles.foreach { jsonFile =>
2332+
val readback = Files.readAllBytes(jsonFile.toPath)
2333+
val expected = ("""{"_1":"a","_2":1}""" + "\n").getBytes(Charset.forName(encoding))
2334+
assert(readback === expected)
2335+
}
23312336
}
23322337
}
23332338
}
@@ -2476,4 +2481,17 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
24762481
checkAnswer(df, Row(Row(1, "string")) :: Row(Row(2, null)) :: Row(null) :: Nil)
24772482
}
24782483
}
2484+
2485+
test("SPARK-24190: restrictions for JSONOptions in read") {
2486+
for (encoding <- Set("UTF-16", "UTF-32")) {
2487+
val exception = intercept[IllegalArgumentException] {
2488+
spark.read
2489+
.option("encoding", encoding)
2490+
.option("multiLine", false)
2491+
.json(testFile("test-data/utf16LE.json"))
2492+
.count()
2493+
}
2494+
assert(exception.getMessage.contains("encoding must not be included in the blacklist"))
2495+
}
2496+
}
24792497
}

0 commit comments

Comments
 (0)