Skip to content

Commit 371ac5f

Browse files
MaxGekkRobert Kruszewski
authored andcommitted
[SPARK-17916][SQL] Fix empty string being parsed as null when nullValue is set.
I propose to bump version of uniVocity parser up to 2.6.3 where quoted empty strings are replaced by the empty value (passed to `setEmptyValue`) instead of `null` values as in the current version 2.5.9: https://github.com/uniVocity/univocity-parsers/blob/v2.6.3/src/main/java/com/univocity/parsers/csv/CsvParser.java#L125 Empty value for writer is set to `""`. So, empty string in dataframe/dataset is stored as empty quoted string `""`. Empty value for reader is set to empty string (zero size). In this way, saved empty quoted string will be read as just empty string. Please, look at the tests for more details. Here are main changes made in [2.6.0](https://github.com/uniVocity/univocity-parsers/releases/tag/v2.6.0), [2.6.1](https://github.com/uniVocity/univocity-parsers/releases/tag/v2.6.1), [2.6.2](https://github.com/uniVocity/univocity-parsers/releases/tag/v2.6.2), [2.6.3](https://github.com/uniVocity/univocity-parsers/releases/tag/v2.6.3): - CSV parser now parses quoted values ~30% faster - CSV format detection process has option provide a list of possible delimiters, in order of priority ( i.e. settings.detectFormatAutomatically( '-', '.');) - uniVocity/univocity-parsers#214 - Implemented trim quoted values support - uniVocity/univocity-parsers#230 - NullPointer when stopping parser when nothing is parsed - uniVocity/univocity-parsers#219 - Concurrency issue when calling stopParsing() - uniVocity/univocity-parsers#231 Closes apache#20068 Added tests from the PR apache#20068 Author: Maxim Gekk <[email protected]> Closes apache#21273 from MaxGekk/univocity-2.6.
1 parent a11ec23 commit 371ac5f

File tree

6 files changed

+131
-4
lines changed

6 files changed

+131
-4
lines changed

dev/deps/spark-deps-hadoop-2.6

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ stax-api-1.0.1.jar
190190
stream-2.7.0.jar
191191
stringtemplate-3.2.1.jar
192192
super-csv-2.2.0.jar
193-
univocity-parsers-2.5.9.jar
193+
univocity-parsers-2.6.3.jar
194194
validation-api-1.1.0.Final.jar
195195
xbean-asm5-shaded-4.4.jar
196196
xercesImpl-2.9.1.jar

dev/deps/spark-deps-hadoop-3.1

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ stream-2.7.0.jar
211211
stringtemplate-3.2.1.jar
212212
super-csv-2.2.0.jar
213213
token-provider-1.0.1.jar
214-
univocity-parsers-2.5.9.jar
214+
univocity-parsers-2.6.3.jar
215215
validation-api-1.1.0.Final.jar
216216
woodstox-core-5.0.3.jar
217217
xbean-asm5-shaded-4.4.jar

sql/core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
<dependency>
3939
<groupId>com.univocity</groupId>
4040
<artifactId>univocity-parsers</artifactId>
41-
<version>2.5.9</version>
41+
<version>2.6.3</version>
4242
<type>jar</type>
4343
</dependency>
4444
<dependency>

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ class CSVOptions(
164164
writerSettings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceFlagInWrite)
165165
writerSettings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceFlagInWrite)
166166
writerSettings.setNullValue(nullValue)
167-
writerSettings.setEmptyValue(nullValue)
167+
writerSettings.setEmptyValue("\"\"")
168168
writerSettings.setSkipEmptyLines(true)
169169
writerSettings.setQuoteAllFields(quoteAll)
170170
writerSettings.setQuoteEscapingEnabled(escapeQuotes)
@@ -185,6 +185,7 @@ class CSVOptions(
185185
settings.setInputBufferSize(inputBufferSize)
186186
settings.setMaxColumns(maxColumns)
187187
settings.setNullValue(nullValue)
188+
settings.setEmptyValue("")
188189
settings.setMaxCharsPerColumn(maxCharsPerColumn)
189190
settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_DELIMITER)
190191
settings
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.execution.datasources.csv
18+
19+
import java.io.File
20+
21+
import org.apache.spark.SparkConf
22+
import org.apache.spark.sql.{Column, Row, SparkSession}
23+
import org.apache.spark.sql.functions.lit
24+
import org.apache.spark.sql.types._
25+
import org.apache.spark.util.{Benchmark, Utils}
26+
27+
/**
28+
* Benchmark to measure CSV read/write performance.
29+
* To run this:
30+
* spark-submit --class <this class> --jars <spark sql test jar>
31+
*/
32+
object CSVBenchmarks {
33+
val conf = new SparkConf()
34+
35+
val spark = SparkSession.builder
36+
.master("local[1]")
37+
.appName("benchmark-csv-datasource")
38+
.config(conf)
39+
.getOrCreate()
40+
import spark.implicits._
41+
42+
def withTempPath(f: File => Unit): Unit = {
43+
val path = Utils.createTempDir()
44+
path.delete()
45+
try f(path) finally Utils.deleteRecursively(path)
46+
}
47+
48+
def quotedValuesBenchmark(rowsNum: Int, numIters: Int): Unit = {
49+
val benchmark = new Benchmark(s"Parsing quoted values", rowsNum)
50+
51+
withTempPath { path =>
52+
val str = (0 until 10000).map(i => s""""$i"""").mkString(",")
53+
54+
spark.range(rowsNum)
55+
.map(_ => str)
56+
.write.option("header", true)
57+
.csv(path.getAbsolutePath)
58+
59+
val schema = new StructType().add("value", StringType)
60+
val ds = spark.read.option("header", true).schema(schema).csv(path.getAbsolutePath)
61+
62+
benchmark.addCase(s"One quoted string", numIters) { _ =>
63+
ds.filter((_: Row) => true).count()
64+
}
65+
66+
/*
67+
Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz
68+
69+
Parsing quoted values: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
70+
--------------------------------------------------------------------------------------------
71+
One quoted string 30273 / 30549 0.0 605451.2 1.0X
72+
*/
73+
benchmark.run()
74+
}
75+
}
76+
77+
def main(args: Array[String]): Unit = {
78+
quotedValuesBenchmark(rowsNum = 50 * 1000, numIters = 3)
79+
}
80+
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1322,4 +1322,50 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
13221322
val sampled = spark.read.option("inferSchema", true).option("samplingRatio", 1.0).csv(ds)
13231323
assert(sampled.count() == ds.count())
13241324
}
1325+
1326+
test("SPARK-17916: An empty string should not be coerced to null when nullValue is passed.") {
1327+
val litNull: String = null
1328+
val df = Seq(
1329+
(1, "John Doe"),
1330+
(2, ""),
1331+
(3, "-"),
1332+
(4, litNull)
1333+
).toDF("id", "name")
1334+
1335+
// Checks for new behavior where an empty string is not coerced to null when `nullValue` is
1336+
// set to anything but an empty string literal.
1337+
withTempPath { path =>
1338+
df.write
1339+
.option("nullValue", "-")
1340+
.csv(path.getAbsolutePath)
1341+
val computed = spark.read
1342+
.option("nullValue", "-")
1343+
.schema(df.schema)
1344+
.csv(path.getAbsolutePath)
1345+
val expected = Seq(
1346+
(1, "John Doe"),
1347+
(2, ""),
1348+
(3, litNull),
1349+
(4, litNull)
1350+
).toDF("id", "name")
1351+
1352+
checkAnswer(computed, expected)
1353+
}
1354+
// Keeps the old behavior where empty string us coerced to nullValue is not passed.
1355+
withTempPath { path =>
1356+
df.write
1357+
.csv(path.getAbsolutePath)
1358+
val computed = spark.read
1359+
.schema(df.schema)
1360+
.csv(path.getAbsolutePath)
1361+
val expected = Seq(
1362+
(1, "John Doe"),
1363+
(2, litNull),
1364+
(3, "-"),
1365+
(4, litNull)
1366+
).toDF("id", "name")
1367+
1368+
checkAnswer(computed, expected)
1369+
}
1370+
}
13251371
}

0 commit comments

Comments
 (0)