Skip to content

Commit d4d6df2

Browse files
committed
[SPARK-26745][SQL] Revert count optimization in JSON datasource by SPARK-24959
## What changes were proposed in this pull request? This PR reverts JSON count optimization part of apache#21909. We cannot distinguish the cases below without parsing: ``` [{...}, {...}] ``` ``` [] ``` ``` {...} ``` ```bash # empty string ``` when we `count()`. One line (input: IN) can be, 0 record, 1 record and multiple records and this is dependent on each input. See also apache#23665 (comment). ## How was this patch tested? Manually tested. Closes apache#23667 from HyukjinKwon/revert-SPARK-24959. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent aeff69b commit d4d6df2

File tree

8 files changed

+19
-30
lines changed

8 files changed

+19
-30
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -188,11 +188,19 @@ class UnivocityParser(
188188
}
189189
}
190190

191+
private val doParse = if (requiredSchema.nonEmpty) {
192+
(input: String) => convert(tokenizer.parseLine(input))
193+
} else {
194+
// If `columnPruning` enabled and partition attributes scanned only,
195+
// `schema` gets empty.
196+
(_: String) => InternalRow.empty
197+
}
198+
191199
/**
192200
* Parses a single CSV string and turns it into either one resulting row or no row (if the
193201
* the record is malformed).
194202
*/
195-
def parse(input: String): InternalRow = convert(tokenizer.parseLine(input))
203+
def parse(input: String): InternalRow = doParse(input)
196204

197205
private val getToken = if (options.columnPruning) {
198206
(tokens: Array[String], index: Int) => tokens(index)
@@ -282,8 +290,7 @@ private[sql] object UnivocityParser {
282290
input => Seq(parser.convert(input)),
283291
parser.options.parseMode,
284292
schema,
285-
parser.options.columnNameOfCorruptRecord,
286-
parser.options.multiLine)
293+
parser.options.columnNameOfCorruptRecord)
287294

288295
val handleHeader: () => Unit =
289296
() => headerChecker.checkHeaderColumnNames(tokenizer)
@@ -336,8 +343,7 @@ private[sql] object UnivocityParser {
336343
input => Seq(parser.parse(input)),
337344
parser.options.parseMode,
338345
schema,
339-
parser.options.columnNameOfCorruptRecord,
340-
parser.options.multiLine)
346+
parser.options.columnNameOfCorruptRecord)
341347
filteredLines.flatMap(safeParser.parse)
342348
}
343349
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,7 @@ case class CsvToStructs(
117117
input => Seq(rawParser.parse(input)),
118118
mode,
119119
nullableSchema,
120-
parsedOptions.columnNameOfCorruptRecord,
121-
parsedOptions.multiLine)
120+
parsedOptions.columnNameOfCorruptRecord)
122121
}
123122

124123
override def dataType: DataType = nullableSchema

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -582,8 +582,7 @@ case class JsonToStructs(
582582
input => rawParser.parse(input, createParser, identity[UTF8String]),
583583
mode,
584584
parserSchema,
585-
parsedOptions.columnNameOfCorruptRecord,
586-
parsedOptions.multiLine)
585+
parsedOptions.columnNameOfCorruptRecord)
587586
}
588587

589588
override def dataType: DataType = nullableSchema

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,7 @@ class FailureSafeParser[IN](
2727
rawParser: IN => Seq[InternalRow],
2828
mode: ParseMode,
2929
schema: StructType,
30-
columnNameOfCorruptRecord: String,
31-
isMultiLine: Boolean) {
30+
columnNameOfCorruptRecord: String) {
3231

3332
private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord)
3433
private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord))
@@ -56,15 +55,9 @@ class FailureSafeParser[IN](
5655
}
5756
}
5857

59-
private val skipParsing = !isMultiLine && mode == PermissiveMode && schema.isEmpty
60-
6158
def parse(input: IN): Iterator[InternalRow] = {
6259
try {
63-
if (skipParsing) {
64-
Iterator.single(InternalRow.empty)
65-
} else {
66-
rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null))
67-
}
60+
rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null))
6861
} catch {
6962
case e: BadRecordException => mode match {
7063
case PermissiveMode =>

sql/core/benchmarks/JSONBenchmark-results.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ Select a subset of 10 columns: Best/Avg Time(ms) Rate(M/s) Per Ro
4141
------------------------------------------------------------------------------------------------
4242
Select 10 columns + count() 19539 / 19896 0.5 1953.9 1.0X
4343
Select 1 column + count() 16412 / 16445 0.6 1641.2 1.2X
44-
count() 2783 / 2801 3.6 278.3 7.0X
4544

4645
Preparing data for benchmarking ...
4746
Java HotSpot(TM) 64-Bit Server VM 1.8.0_191-b12 on Linux 3.16.0-31-generic

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -468,8 +468,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
468468
input => rawParser.parse(input, createParser, UTF8String.fromString),
469469
parsedOptions.parseMode,
470470
schema,
471-
parsedOptions.columnNameOfCorruptRecord,
472-
parsedOptions.multiLine)
471+
parsedOptions.columnNameOfCorruptRecord)
473472
iter.flatMap(parser.parse)
474473
}
475474
sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = jsonDataset.isStreaming)
@@ -538,8 +537,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
538537
input => Seq(rawParser.parse(input)),
539538
parsedOptions.parseMode,
540539
schema,
541-
parsedOptions.columnNameOfCorruptRecord,
542-
parsedOptions.multiLine)
540+
parsedOptions.columnNameOfCorruptRecord)
543541
iter.flatMap(parser.parse)
544542
}
545543
sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = csvDataset.isStreaming)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,7 @@ object TextInputJsonDataSource extends JsonDataSource {
140140
input => parser.parse(input, textParser, textToUTF8String),
141141
parser.options.parseMode,
142142
schema,
143-
parser.options.columnNameOfCorruptRecord,
144-
parser.options.multiLine)
143+
parser.options.columnNameOfCorruptRecord)
145144
linesReader.flatMap(safeParser.parse)
146145
}
147146

@@ -225,8 +224,7 @@ object MultiLineJsonDataSource extends JsonDataSource {
225224
input => parser.parse[InputStream](input, streamParser, partitionedFileString),
226225
parser.options.parseMode,
227226
schema,
228-
parser.options.columnNameOfCorruptRecord,
229-
parser.options.multiLine)
227+
parser.options.columnNameOfCorruptRecord)
230228

231229
safeParser.parse(
232230
CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath))))

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -217,9 +217,6 @@ object JSONBenchmark extends SqlBasedBenchmark {
217217
benchmark.addCase(s"Select 1 column + count()", numIters) { _ =>
218218
ds.select($"col1").filter((_: Row) => true).count()
219219
}
220-
benchmark.addCase(s"count()", numIters) { _ =>
221-
ds.count()
222-
}
223220

224221
benchmark.run()
225222
}

0 commit comments

Comments
 (0)