Skip to content

Commit 6242885

Browse files
gengliangwangdongjoon-hyun
authored andcommitted
[SPARK-27085][SQL] Migrate CSV to File Data Source V2
## What changes were proposed in this pull request? Migrate CSV to File Data Source V2. ## How was this patch tested? Unit test Closes apache#24005 from gengliangwang/CSVDataSourceV2. Authored-by: Gengliang Wang <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 240c6a4 commit 6242885

File tree

18 files changed

+544
-74
lines changed

18 files changed

+544
-74
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1482,7 +1482,7 @@ object SQLConf {
14821482
" register class names for which data source V2 write paths are disabled. Writes from these" +
14831483
" sources will fall back to the V1 sources.")
14841484
.stringConf
1485-
.createWithDefault("orc")
1485+
.createWithDefault("csv,orc")
14861486

14871487
val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers")
14881488
.doc("A comma-separated list of fully qualified data source register class names for which" +

sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
1+
org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2
22
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
33
org.apache.spark.sql.execution.datasources.json.JsonFileFormat
44
org.apache.spark.sql.execution.datasources.noop.NoopDataSource

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
3737
import org.apache.spark.sql.catalyst.plans.logical._
3838
import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, quoteIdentifier}
3939
import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils}
40-
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
4140
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
4241
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
42+
import org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2
4343
import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2
4444
import org.apache.spark.sql.internal.SQLConf
4545
import org.apache.spark.sql.types._
@@ -238,7 +238,7 @@ case class AlterTableAddColumnsCommand(
238238
// TextFileFormat only default to one column "value"
239239
// Hive type is already considered as hive serde table, so the logic will not
240240
// come in here.
241-
case _: JsonFileFormat | _: CSVFileFormat | _: ParquetFileFormat | _: OrcDataSourceV2 =>
241+
case _: JsonFileFormat | _: CSVDataSourceV2 | _: ParquetFileFormat | _: OrcDataSourceV2 =>
242242
case s if s.getClass.getCanonicalName.endsWith("OrcFileFormat") =>
243243
case s =>
244244
throw new AnalysisException(

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
118118
throw new AnalysisException(
119119
"Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the\n" +
120120
"referenced columns only include the internal corrupt record column\n" +
121-
s"(named _corrupt_record by default). For example:\n" +
121+
"(named _corrupt_record by default). For example:\n" +
122122
"spark.read.schema(schema).csv(file).filter($\"_corrupt_record\".isNotNull).count()\n" +
123123
"and spark.read.schema(schema).csv(file).select(\"_corrupt_record\").show().\n" +
124124
"Instead, you can cache or save the parsed results and then send the same query.\n" +
@@ -163,31 +163,3 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
163163

164164
}
165165

166-
private[csv] class CsvOutputWriter(
167-
path: String,
168-
dataSchema: StructType,
169-
context: TaskAttemptContext,
170-
params: CSVOptions) extends OutputWriter with Logging {
171-
172-
private var univocityGenerator: Option[UnivocityGenerator] = None
173-
174-
if (params.headerFlag) {
175-
val gen = getGen()
176-
gen.writeHeaders()
177-
}
178-
179-
private def getGen(): UnivocityGenerator = univocityGenerator.getOrElse {
180-
val charset = Charset.forName(params.charset)
181-
val os = CodecStreams.createOutputStreamWriter(context, new Path(path), charset)
182-
val newGen = new UnivocityGenerator(dataSchema, os, params)
183-
univocityGenerator = Some(newGen)
184-
newGen
185-
}
186-
187-
override def write(row: InternalRow): Unit = {
188-
val gen = getGen()
189-
gen.write(row)
190-
}
191-
192-
override def close(): Unit = univocityGenerator.foreach(_.close())
193-
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.execution.datasources.csv
18+
19+
import java.nio.charset.Charset
20+
21+
import org.apache.hadoop.fs.Path
22+
import org.apache.hadoop.mapreduce.TaskAttemptContext
23+
24+
import org.apache.spark.internal.Logging
25+
import org.apache.spark.sql.catalyst.InternalRow
26+
import org.apache.spark.sql.catalyst.csv.{CSVOptions, UnivocityGenerator}
27+
import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter}
28+
import org.apache.spark.sql.types.StructType
29+
30+
class CsvOutputWriter(
31+
path: String,
32+
dataSchema: StructType,
33+
context: TaskAttemptContext,
34+
params: CSVOptions) extends OutputWriter with Logging {
35+
36+
private var univocityGenerator: Option[UnivocityGenerator] = None
37+
38+
if (params.headerFlag) {
39+
val gen = getGen()
40+
gen.writeHeaders()
41+
}
42+
43+
private def getGen(): UnivocityGenerator = univocityGenerator.getOrElse {
44+
val charset = Charset.forName(params.charset)
45+
val os = CodecStreams.createOutputStreamWriter(context, new Path(path), charset)
46+
val newGen = new UnivocityGenerator(dataSchema, os, params)
47+
univocityGenerator = Some(newGen)
48+
newGen
49+
}
50+
51+
override def write(row: InternalRow): Unit = {
52+
val gen = getGen()
53+
gen.write(row)
54+
}
55+
56+
override def close(): Unit = univocityGenerator.foreach(_.close())
57+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,8 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister {
4747
Option(map.get("path")).toSeq
4848
}
4949
}
50+
51+
protected def getTableName(paths: Seq[String]): String = {
52+
shortName() + ":" + paths.mkString(";")
53+
}
5054
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.execution.datasources.v2
18+
19+
import org.apache.spark.sql.sources.v2.reader.PartitionReader
20+
21+
class PartitionReaderFromIterator[InternalRow](
22+
iter: Iterator[InternalRow]) extends PartitionReader[InternalRow] {
23+
private var currentValue: InternalRow = _
24+
25+
override def next(): Boolean = {
26+
if (iter.hasNext) {
27+
currentValue = iter.next()
28+
true
29+
} else {
30+
false
31+
}
32+
}
33+
34+
override def get(): InternalRow = currentValue
35+
36+
override def close(): Unit = {}
37+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.execution.datasources.v2
18+
19+
import scala.collection.JavaConverters._
20+
21+
import org.apache.hadoop.fs.Path
22+
import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}
23+
24+
import org.apache.spark.sql.SparkSession
25+
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
26+
import org.apache.spark.sql.types.StructType
27+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
28+
29+
abstract class TextBasedFileScan(
30+
sparkSession: SparkSession,
31+
fileIndex: PartitioningAwareFileIndex,
32+
readSchema: StructType,
33+
options: CaseInsensitiveStringMap)
34+
extends FileScan(sparkSession, fileIndex, readSchema, options) {
35+
private var codecFactory: CompressionCodecFactory = _
36+
37+
override def isSplitable(path: Path): Boolean = {
38+
if (codecFactory == null) {
39+
codecFactory = new CompressionCodecFactory(
40+
sparkSession.sessionState.newHadoopConfWithOptions(options.asScala.toMap))
41+
}
42+
val codec = codecFactory.getCodec(path)
43+
codec == null || codec.isInstanceOf[SplittableCompressionCodec]
44+
}
45+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.execution.datasources.v2.csv
18+
19+
import org.apache.spark.sql.execution.datasources.FileFormat
20+
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
21+
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
22+
import org.apache.spark.sql.sources.v2.Table
23+
import org.apache.spark.sql.types._
24+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
25+
26+
class CSVDataSourceV2 extends FileDataSourceV2 {
27+
28+
override def fallBackFileFormat: Class[_ <: FileFormat] = classOf[CSVFileFormat]
29+
30+
override def shortName(): String = "csv"
31+
32+
override def getTable(options: CaseInsensitiveStringMap): Table = {
33+
val paths = getPaths(options)
34+
val tableName = getTableName(paths)
35+
CSVTable(tableName, sparkSession, options, paths, None)
36+
}
37+
38+
override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
39+
val paths = getPaths(options)
40+
val tableName = getTableName(paths)
41+
CSVTable(tableName, sparkSession, options, paths, Some(schema))
42+
}
43+
}
44+
45+
object CSVDataSourceV2 {
46+
def supportsDataType(dataType: DataType): Boolean = dataType match {
47+
case _: AtomicType => true
48+
49+
case udt: UserDefinedType[_] => supportsDataType(udt.sqlType)
50+
51+
case _ => false
52+
}
53+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.execution.datasources.v2.csv
18+
19+
import org.apache.spark.broadcast.Broadcast
20+
import org.apache.spark.sql.catalyst.InternalRow
21+
import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVOptions, UnivocityParser}
22+
import org.apache.spark.sql.execution.datasources.PartitionedFile
23+
import org.apache.spark.sql.execution.datasources.csv.CSVDataSource
24+
import org.apache.spark.sql.execution.datasources.v2._
25+
import org.apache.spark.sql.internal.SQLConf
26+
import org.apache.spark.sql.sources.v2.reader.PartitionReader
27+
import org.apache.spark.sql.types.StructType
28+
import org.apache.spark.util.SerializableConfiguration
29+
30+
/**
31+
* A factory used to create CSV readers.
32+
*
33+
* @param sqlConf SQL configuration.
34+
* @param broadcastedConf Broadcasted serializable Hadoop Configuration.
35+
* @param dataSchema Schema of CSV files.
36+
* @param partitionSchema Schema of partitions.
37+
* @param readSchema Required schema in the batch scan.
38+
* @param parsedOptions Options for parsing CSV files.
39+
*/
40+
case class CSVPartitionReaderFactory(
41+
sqlConf: SQLConf,
42+
broadcastedConf: Broadcast[SerializableConfiguration],
43+
dataSchema: StructType,
44+
partitionSchema: StructType,
45+
readSchema: StructType,
46+
parsedOptions: CSVOptions) extends FilePartitionReaderFactory {
47+
private val columnPruning = sqlConf.csvColumnPruning
48+
private val readDataSchema =
49+
getReadDataSchema(readSchema, partitionSchema, sqlConf.caseSensitiveAnalysis)
50+
51+
override def buildReader(file: PartitionedFile): PartitionReader[InternalRow] = {
52+
val conf = broadcastedConf.value.value
53+
54+
val parser = new UnivocityParser(
55+
StructType(dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)),
56+
StructType(readDataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)),
57+
parsedOptions)
58+
val schema = if (columnPruning) readDataSchema else dataSchema
59+
val isStartOfFile = file.start == 0
60+
val headerChecker = new CSVHeaderChecker(
61+
schema, parsedOptions, source = s"CSV file: ${file.filePath}", isStartOfFile)
62+
val iter = CSVDataSource(parsedOptions).readFile(
63+
conf,
64+
file,
65+
parser,
66+
headerChecker,
67+
readDataSchema)
68+
val fileReader = new PartitionReaderFromIterator[InternalRow](iter)
69+
new PartitionReaderWithPartitionValues(fileReader, readDataSchema,
70+
partitionSchema, file.partitionValues)
71+
}
72+
}

0 commit comments

Comments
 (0)