Skip to content

Commit 34c4b9c

Browse files
HyukjinKwoncloud-fan
authored andcommitted
[SPARK-23765][SQL] Supports custom line separator for json datasource
## What changes were proposed in this pull request? This PR proposes to add lineSep option for a configurable line separator in text datasource. It supports this option by using `LineRecordReader`'s functionality with passing it to the constructor. The approach is similar with apache#20727; however, one main difference is, it uses text datasource's `lineSep` option to parse line by line in JSON's schema inference. ## How was this patch tested? Manually tested and unit tests were added. Author: hyukjinkwon <[email protected]> Author: hyukjinkwon <[email protected]> Closes apache#20877 from HyukjinKwon/linesep-json.
1 parent ed72bad commit 34c4b9c

File tree

12 files changed

+136
-15
lines changed

12 files changed

+136
-15
lines changed

python/pyspark/sql/readwriter.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
176176
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
177177
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
178178
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
179-
multiLine=None, allowUnquotedControlChars=None):
179+
multiLine=None, allowUnquotedControlChars=None, lineSep=None):
180180
"""
181181
Loads JSON files and returns the results as a :class:`DataFrame`.
182182
@@ -237,6 +237,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
237237
:param allowUnquotedControlChars: allows JSON Strings to contain unquoted control
238238
characters (ASCII characters with value less than 32,
239239
including tab and line feed characters) or not.
240+
:param lineSep: defines the line separator that should be used for parsing. If None is
241+
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
240242
241243
>>> df1 = spark.read.json('python/test_support/sql/people.json')
242244
>>> df1.dtypes
@@ -254,7 +256,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
254256
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
255257
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
256258
timestampFormat=timestampFormat, multiLine=multiLine,
257-
allowUnquotedControlChars=allowUnquotedControlChars)
259+
allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep)
258260
if isinstance(path, basestring):
259261
path = [path]
260262
if type(path) == list:
@@ -746,7 +748,8 @@ def saveAsTable(self, name, format=None, mode=None, partitionBy=None, **options)
746748
self._jwrite.saveAsTable(name)
747749

748750
@since(1.4)
749-
def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None):
751+
def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None,
752+
lineSep=None):
750753
"""Saves the content of the :class:`DataFrame` in JSON format
751754
(`JSON Lines text format or newline-delimited JSON <http://jsonlines.org/>`_) at the
752755
specified path.
@@ -770,12 +773,15 @@ def json(self, path, mode=None, compression=None, dateFormat=None, timestampForm
770773
formats follow the formats at ``java.text.SimpleDateFormat``.
771774
This applies to timestamp type. If None is set, it uses the
772775
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSXXX``.
776+
:param lineSep: defines the line separator that should be used for writing. If None is
777+
set, it uses the default value, ``\\n``.
773778
774779
>>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
775780
"""
776781
self.mode(mode)
777782
self._set_opts(
778-
compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat)
783+
compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat,
784+
lineSep=lineSep)
779785
self._jwrite.json(path)
780786

781787
@since(1.4)

python/pyspark/sql/streaming.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
405405
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
406406
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
407407
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
408-
multiLine=None, allowUnquotedControlChars=None):
408+
multiLine=None, allowUnquotedControlChars=None, lineSep=None):
409409
"""
410410
Loads a JSON file stream and returns the results as a :class:`DataFrame`.
411411
@@ -468,6 +468,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
468468
:param allowUnquotedControlChars: allows JSON Strings to contain unquoted control
469469
characters (ASCII characters with value less than 32,
470470
including tab and line feed characters) or not.
471+
:param lineSep: defines the line separator that should be used for parsing. If None is
472+
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
471473
472474
>>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)
473475
>>> json_sdf.isStreaming
@@ -482,7 +484,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
482484
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
483485
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
484486
timestampFormat=timestampFormat, multiLine=multiLine,
485-
allowUnquotedControlChars=allowUnquotedControlChars)
487+
allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep)
486488
if isinstance(path, basestring):
487489
return self._df(self._jreader.json(path))
488490
else:

python/pyspark/sql/tests.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -676,6 +676,23 @@ def test_multiline_json(self):
676676
multiLine=True)
677677
self.assertEqual(people1.collect(), people_array.collect())
678678

679+
def test_linesep_json(self):
680+
df = self.spark.read.json("python/test_support/sql/people.json", lineSep=",")
681+
expected = [Row(_corrupt_record=None, name=u'Michael'),
682+
Row(_corrupt_record=u' "age":30}\n{"name":"Justin"', name=None),
683+
Row(_corrupt_record=u' "age":19}\n', name=None)]
684+
self.assertEqual(df.collect(), expected)
685+
686+
tpath = tempfile.mkdtemp()
687+
shutil.rmtree(tpath)
688+
try:
689+
df = self.spark.read.json("python/test_support/sql/people.json")
690+
df.write.json(tpath, lineSep="!!")
691+
readback = self.spark.read.json(tpath, lineSep="!!")
692+
self.assertEqual(readback.collect(), df.collect())
693+
finally:
694+
shutil.rmtree(tpath)
695+
679696
def test_multiline_csv(self):
680697
ages_newlines = self.spark.read.csv(
681698
"python/test_support/sql/ages_newlines.csv", multiLine=True)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.catalyst.json
1919

20+
import java.nio.charset.StandardCharsets
2021
import java.util.{Locale, TimeZone}
2122

2223
import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
@@ -85,6 +86,16 @@ private[sql] class JSONOptions(
8586

8687
val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
8788

89+
val lineSeparator: Option[String] = parameters.get("lineSep").map { sep =>
90+
require(sep.nonEmpty, "'lineSep' cannot be an empty string.")
91+
sep
92+
}
93+
// Note that the option 'lineSep' uses a different default value in read and write.
94+
val lineSeparatorInRead: Option[Array[Byte]] =
95+
lineSeparator.map(_.getBytes(StandardCharsets.UTF_8))
96+
// Note that JSON uses writer with UTF-8 charset. This string will be written out as UTF-8.
97+
val lineSeparatorInWrite: String = lineSeparator.getOrElse("\n")
98+
8899
/** Sets config options on a Jackson [[JsonFactory]]. */
89100
def setJacksonOptions(factory: JsonFactory): Unit = {
90101
factory.configure(JsonParser.Feature.ALLOW_COMMENTS, allowComments)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.catalyst.json
1919

2020
import java.io.Writer
21+
import java.nio.charset.StandardCharsets
2122

2223
import com.fasterxml.jackson.core._
2324

@@ -74,6 +75,8 @@ private[sql] class JacksonGenerator(
7475

7576
private val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
7677

78+
private val lineSeparator: String = options.lineSeparatorInWrite
79+
7780
private def makeWriter(dataType: DataType): ValueWriter = dataType match {
7881
case NullType =>
7982
(row: SpecializedGetters, ordinal: Int) =>
@@ -251,5 +254,8 @@ private[sql] class JacksonGenerator(
251254
mapType = dataType.asInstanceOf[MapType]))
252255
}
253256

254-
def writeLineEnding(): Unit = gen.writeRaw('\n')
257+
def writeLineEnding(): Unit = {
258+
// Note that JSON uses writer with UTF-8 charset. This string will be written out as UTF-8.
259+
gen.writeRaw(lineSeparator)
260+
}
255261
}

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
366366
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
367367
* <li>`multiLine` (default `false`): parse one record, which may span multiple lines,
368368
* per file</li>
369+
* <li>`lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator
370+
* that should be used for parsing.</li>
369371
* </ul>
370372
*
371373
* @since 2.0.0

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -518,6 +518,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
518518
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`): sets the string that
519519
* indicates a timestamp format. Custom date formats follow the formats at
520520
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
521+
* <li>`lineSep` (default `\n`): defines the line separator that should
522+
* be used for writing.</li>
521523
* </ul>
522524
*
523525
* @since 1.4.0

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import org.apache.spark.sql.{AnalysisException, Dataset, Encoders, SparkSession}
3535
import org.apache.spark.sql.catalyst.InternalRow
3636
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
3737
import org.apache.spark.sql.execution.datasources._
38-
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
38+
import org.apache.spark.sql.execution.datasources.text.{TextFileFormat, TextOptions}
3939
import org.apache.spark.sql.types.StructType
4040
import org.apache.spark.unsafe.types.UTF8String
4141
import org.apache.spark.util.Utils
@@ -92,7 +92,8 @@ object TextInputJsonDataSource extends JsonDataSource {
9292
sparkSession: SparkSession,
9393
inputPaths: Seq[FileStatus],
9494
parsedOptions: JSONOptions): StructType = {
95-
val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths)
95+
val json: Dataset[String] = createBaseDataset(
96+
sparkSession, inputPaths, parsedOptions.lineSeparator)
9697
inferFromDataset(json, parsedOptions)
9798
}
9899

@@ -104,13 +105,19 @@ object TextInputJsonDataSource extends JsonDataSource {
104105

105106
private def createBaseDataset(
106107
sparkSession: SparkSession,
107-
inputPaths: Seq[FileStatus]): Dataset[String] = {
108+
inputPaths: Seq[FileStatus],
109+
lineSeparator: Option[String]): Dataset[String] = {
110+
val textOptions = lineSeparator.map { lineSep =>
111+
Map(TextOptions.LINE_SEPARATOR -> lineSep)
112+
}.getOrElse(Map.empty[String, String])
113+
108114
val paths = inputPaths.map(_.getPath.toString)
109115
sparkSession.baseRelationToDataFrame(
110116
DataSource.apply(
111117
sparkSession,
112118
paths = paths,
113-
className = classOf[TextFileFormat].getName
119+
className = classOf[TextFileFormat].getName,
120+
options = textOptions
114121
).resolveRelation(checkFilesExist = false))
115122
.select("value").as(Encoders.STRING)
116123
}
@@ -120,7 +127,7 @@ object TextInputJsonDataSource extends JsonDataSource {
120127
file: PartitionedFile,
121128
parser: JacksonParser,
122129
schema: StructType): Iterator[InternalRow] = {
123-
val linesReader = new HadoopFileLinesReader(file, conf)
130+
val linesReader = new HadoopFileLinesReader(file, parser.options.lineSeparatorInRead, conf)
124131
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close()))
125132
val safeParser = new FailureSafeParser[Text](
126133
input => parser.parse(input, CreateJacksonParser.text, textToUTF8String),

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti
5252
lineSeparatorInRead.getOrElse("\n".getBytes(StandardCharsets.UTF_8))
5353
}
5454

55-
private[text] object TextOptions {
55+
private[datasources] object TextOptions {
5656
val COMPRESSION = "compression"
5757
val WHOLETEXT = "wholetext"
5858
val LINE_SEPARATOR = "lineSep"

sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
268268
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
269269
* <li>`multiLine` (default `false`): parse one record, which may span multiple lines,
270270
* per file</li>
271+
* <li>`lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator
272+
* that should be used for parsing.</li>
271273
* </ul>
272274
*
273275
* @since 2.0.0

0 commit comments

Comments
 (0)