Skip to content

Commit f9837d3

Browse files
gengliangwangcloud-fan
authored andcommitted
[SPARK-27448][SQL] File source V2 table provider should be compatible with V1 provider
## What changes were proposed in this pull request? In the rule `PreprocessTableCreation`, if an existing table is appended with a different provider, the action will fail. Currently, there are two implementations for file sources and creating a table with file source V2 will always fall back to V1 FileFormat. We should consider the following cases as valid: 1. Appending a table with file source V2 provider using the v1 file format 2. Appending a table with v1 file format provider using file source V2 format ## How was this patch tested? Unit test Closes apache#24356 from gengliangwang/fixTableProvider. Authored-by: Gengliang Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent a4cf1a4 commit f9837d3

File tree

2 files changed

+77
-1
lines changed

2 files changed

+77
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, Expres
2626
import org.apache.spark.sql.catalyst.plans.logical._
2727
import org.apache.spark.sql.catalyst.rules.Rule
2828
import org.apache.spark.sql.execution.command.DDLUtils
29+
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
2930
import org.apache.spark.sql.internal.SQLConf
3031
import org.apache.spark.sql.sources.InsertableRelation
3132
import org.apache.spark.sql.types.{AtomicType, StructType}
@@ -113,7 +114,9 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
113114
val specifiedProvider = DataSource.lookupDataSource(tableDesc.provider.get, conf)
114115
// TODO: Check that options from the resolved relation match the relation that we are
115116
// inserting into (i.e. using the same compression).
116-
if (existingProvider != specifiedProvider) {
117+
// If the one of the provider is [[FileDataSourceV2]] and the other one is its corresponding
118+
// [[FileFormat]], the two providers are considered compatible.
119+
if (fallBackV2ToV1(existingProvider) != fallBackV2ToV1(specifiedProvider)) {
117120
throw new AnalysisException(s"The format of the existing table $tableName is " +
118121
s"`${existingProvider.getSimpleName}`. It doesn't match the specified format " +
119122
s"`${specifiedProvider.getSimpleName}`.")
@@ -235,6 +238,11 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
235238
}
236239
}
237240

241+
private def fallBackV2ToV1(cls: Class[_]): Class[_] = cls.newInstance match {
242+
case f: FileDataSourceV2 => f.fallbackFileFormat
243+
case _ => cls
244+
}
245+
238246
private def normalizeCatalogTable(schema: StructType, table: CatalogTable): CatalogTable = {
239247
SchemaUtils.checkSchemaColumnNameDuplication(
240248
schema,

sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -968,6 +968,74 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
968968
}
969969
}
970970

971+
test("append a table with file source V2 provider using the v1 file format") {
972+
def createDF(from: Int, to: Int): DataFrame = {
973+
(from to to).map(i => i -> s"str$i").toDF("c1", "c2")
974+
}
975+
976+
withTable("appendCSV") {
977+
createDF(0, 9)
978+
.write
979+
.mode(SaveMode.Append)
980+
.format("org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2")
981+
.saveAsTable("appendCSV")
982+
createDF(10, 19)
983+
.write
984+
.mode(SaveMode.Append)
985+
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
986+
.saveAsTable("appendCSV")
987+
checkAnswer(
988+
sql("SELECT p.c1, p.c2 FROM appendCSV p WHERE p.c1 > 5"),
989+
(6 to 19).map(i => Row(i, s"str$i")))
990+
}
991+
992+
withTable("appendCSV") {
993+
createDF(0, 9).write.mode(SaveMode.Append).format("csv").saveAsTable("appendCSV")
994+
createDF(10, 19)
995+
.write
996+
.mode(SaveMode.Append)
997+
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
998+
.saveAsTable("appendCSV")
999+
checkAnswer(
1000+
sql("SELECT p.c1, p.c2 FROM appendCSV p WHERE p.c1 > 5"),
1001+
(6 to 19).map(i => Row(i, s"str$i")))
1002+
}
1003+
}
1004+
1005+
test("append a table with v1 file format provider using file source V2 format") {
1006+
def createDF(from: Int, to: Int): DataFrame = {
1007+
(from to to).map(i => i -> s"str$i").toDF("c1", "c2")
1008+
}
1009+
1010+
withTable("appendCSV") {
1011+
createDF(0, 9)
1012+
.write
1013+
.mode(SaveMode.Append)
1014+
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
1015+
.saveAsTable("appendCSV")
1016+
createDF(10, 19)
1017+
.write
1018+
.mode(SaveMode.Append)
1019+
.format("org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2")
1020+
.saveAsTable("appendCSV")
1021+
checkAnswer(
1022+
sql("SELECT p.c1, p.c2 FROM appendCSV p WHERE p.c1 > 5"),
1023+
(6 to 19).map(i => Row(i, s"str$i")))
1024+
}
1025+
1026+
withTable("appendCSV") {
1027+
createDF(0, 9)
1028+
.write
1029+
.mode(SaveMode.Append)
1030+
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
1031+
.saveAsTable("appendCSV")
1032+
createDF(10, 19).write.mode(SaveMode.Append).format("csv").saveAsTable("appendCSV")
1033+
checkAnswer(
1034+
sql("SELECT p.c1, p.c2 FROM appendCSV p WHERE p.c1 > 5"),
1035+
(6 to 19).map(i => Row(i, s"str$i")))
1036+
}
1037+
}
1038+
9711039
test("SPARK-8156:create table to specific database by 'use dbname' ") {
9721040

9731041
val df = (1 to 3).map(i => (i, s"val_$i", i * 2)).toDF("a", "b", "c")

0 commit comments

Comments
 (0)