Skip to content

Commit 4decedf

Browse files
wangyumgatorsmile
authored andcommitted
[SPARK-22002][SQL] Read JDBC table use custom schema support specify partial fields.
## What changes were proposed in this pull request? apache#18266 add a new feature to support read JDBC table use custom schema, but we must specify all the fields. For simplicity, this PR support specify partial fields. ## How was this patch tested? unit tests Author: Yuming Wang <[email protected]> Closes apache#19231 from wangyum/SPARK-22002.
1 parent 22b111e commit 4decedf

File tree

4 files changed

+40
-62
lines changed

4 files changed

+40
-62
lines changed

docs/sql-programming-guide.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1333,7 +1333,7 @@ the following case-insensitive options:
13331333
<tr>
13341334
<td><code>customSchema</code></td>
13351335
<td>
1336-
The custom schema to use for reading data from JDBC connectors. For example, "id DECIMAL(38, 0), name STRING"). The column names should be identical to the corresponding column names of JDBC table. Users can specify the corresponding data types of Spark SQL instead of using the defaults. This option applies only to reading.
1336+
The custom schema to use for reading data from JDBC connectors. For example, <code>"id DECIMAL(38, 0), name STRING"</code>. You can also specify partial fields, and the others use the default type mapping. For example, <code>"id DECIMAL(38, 0)"</code>. The column names should be identical to the corresponding column names of JDBC table. Users can specify the corresponding data types of Spark SQL instead of using the defaults. This option applies only to reading.
13371337
</td>
13381338
</tr>
13391339
</table>

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -301,12 +301,11 @@ object JdbcUtils extends Logging {
301301
} else {
302302
rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls
303303
}
304-
val metadata = new MetadataBuilder()
305-
.putLong("scale", fieldScale)
304+
val metadata = new MetadataBuilder().putLong("scale", fieldScale)
306305
val columnType =
307306
dialect.getCatalystType(dataType, typeName, fieldSize, metadata).getOrElse(
308307
getCatalystType(dataType, fieldSize, fieldScale, isSigned))
309-
fields(i) = StructField(columnName, columnType, nullable, metadata.build())
308+
fields(i) = StructField(columnName, columnType, nullable)
310309
i = i + 1
311310
}
312311
new StructType(fields)
@@ -768,31 +767,30 @@ object JdbcUtils extends Logging {
768767
}
769768

770769
/**
771-
* Parses the user specified customSchema option value to DataFrame schema,
772-
* and returns it if it's all columns are equals to default schema's.
770+
* Parses the user specified customSchema option value to DataFrame schema, and
771+
* returns a schema that is replaced by the custom schema's dataType if column name is matched.
773772
*/
774773
def getCustomSchema(
775774
tableSchema: StructType,
776775
customSchema: String,
777776
nameEquality: Resolver): StructType = {
778-
val userSchema = CatalystSqlParser.parseTableSchema(customSchema)
777+
if (null != customSchema && customSchema.nonEmpty) {
778+
val userSchema = CatalystSqlParser.parseTableSchema(customSchema)
779779

780-
SchemaUtils.checkColumnNameDuplication(
781-
userSchema.map(_.name), "in the customSchema option value", nameEquality)
782-
783-
val colNames = tableSchema.fieldNames.mkString(",")
784-
val errorMsg = s"Please provide all the columns, all columns are: $colNames"
785-
if (userSchema.size != tableSchema.size) {
786-
throw new AnalysisException(errorMsg)
787-
}
780+
SchemaUtils.checkColumnNameDuplication(
781+
userSchema.map(_.name), "in the customSchema option value", nameEquality)
788782

789-
// This is resolved by names, only check the column names.
790-
userSchema.fieldNames.foreach { col =>
791-
tableSchema.find(f => nameEquality(f.name, col)).getOrElse {
792-
throw new AnalysisException(errorMsg)
783+
// This is resolved by names, use the custom filed dataType to replace the default dataType.
784+
val newSchema = tableSchema.map { col =>
785+
userSchema.find(f => nameEquality(f.name, col.name)) match {
786+
case Some(c) => col.copy(dataType = c.dataType)
787+
case None => col
788+
}
793789
}
790+
StructType(newSchema)
791+
} else {
792+
tableSchema
794793
}
795-
userSchema
796794
}
797795

798796
/**

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala

Lines changed: 14 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -30,57 +30,38 @@ class JdbcUtilsSuite extends SparkFunSuite {
3030
val caseInsensitive = org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
3131

3232
test("Parse user specified column types") {
33-
assert(
34-
JdbcUtils.getCustomSchema(tableSchema, "C1 DATE, C2 STRING", caseInsensitive) ===
35-
StructType(Seq(StructField("C1", DateType, true), StructField("C2", StringType, true))))
36-
assert(JdbcUtils.getCustomSchema(tableSchema, "C1 DATE, C2 STRING", caseSensitive) ===
37-
StructType(Seq(StructField("C1", DateType, true), StructField("C2", StringType, true))))
33+
assert(JdbcUtils.getCustomSchema(tableSchema, null, caseInsensitive) === tableSchema)
34+
assert(JdbcUtils.getCustomSchema(tableSchema, "", caseInsensitive) === tableSchema)
35+
36+
assert(JdbcUtils.getCustomSchema(tableSchema, "c1 DATE", caseInsensitive) ===
37+
StructType(Seq(StructField("C1", DateType, false), StructField("C2", IntegerType, false))))
38+
assert(JdbcUtils.getCustomSchema(tableSchema, "c1 DATE", caseSensitive) ===
39+
StructType(Seq(StructField("C1", StringType, false), StructField("C2", IntegerType, false))))
40+
3841
assert(
3942
JdbcUtils.getCustomSchema(tableSchema, "c1 DATE, C2 STRING", caseInsensitive) ===
40-
StructType(Seq(StructField("c1", DateType, true), StructField("C2", StringType, true))))
41-
assert(JdbcUtils.getCustomSchema(
42-
tableSchema, "c1 DECIMAL(38, 0), C2 STRING", caseInsensitive) ===
43-
StructType(Seq(StructField("c1", DecimalType(38, 0), true),
44-
StructField("C2", StringType, true))))
43+
StructType(Seq(StructField("C1", DateType, false), StructField("C2", StringType, false))))
44+
assert(JdbcUtils.getCustomSchema(tableSchema, "c1 DATE, C2 STRING", caseSensitive) ===
45+
StructType(Seq(StructField("C1", StringType, false), StructField("C2", StringType, false))))
4546

4647
// Throw AnalysisException
4748
val duplicate = intercept[AnalysisException]{
4849
JdbcUtils.getCustomSchema(tableSchema, "c1 DATE, c1 STRING", caseInsensitive) ===
49-
StructType(Seq(StructField("c1", DateType, true), StructField("c1", StringType, true)))
50+
StructType(Seq(StructField("c1", DateType, false), StructField("c1", StringType, false)))
5051
}
5152
assert(duplicate.getMessage.contains(
5253
"Found duplicate column(s) in the customSchema option value"))
5354

54-
val allColumns = intercept[AnalysisException]{
55-
JdbcUtils.getCustomSchema(tableSchema, "C1 STRING", caseSensitive) ===
56-
StructType(Seq(StructField("C1", DateType, true)))
57-
}
58-
assert(allColumns.getMessage.contains("Please provide all the columns,"))
59-
60-
val caseSensitiveColumnNotFound = intercept[AnalysisException]{
61-
JdbcUtils.getCustomSchema(tableSchema, "c1 DATE, C2 STRING", caseSensitive) ===
62-
StructType(Seq(StructField("c1", DateType, true), StructField("C2", StringType, true)))
63-
}
64-
assert(caseSensitiveColumnNotFound.getMessage.contains(
65-
"Please provide all the columns, all columns are: C1,C2;"))
66-
67-
val caseInsensitiveColumnNotFound = intercept[AnalysisException]{
68-
JdbcUtils.getCustomSchema(tableSchema, "c3 DATE, C2 STRING", caseInsensitive) ===
69-
StructType(Seq(StructField("c3", DateType, true), StructField("C2", StringType, true)))
70-
}
71-
assert(caseInsensitiveColumnNotFound.getMessage.contains(
72-
"Please provide all the columns, all columns are: C1,C2;"))
73-
7455
// Throw ParseException
7556
val dataTypeNotSupported = intercept[ParseException]{
7657
JdbcUtils.getCustomSchema(tableSchema, "c3 DATEE, C2 STRING", caseInsensitive) ===
77-
StructType(Seq(StructField("c3", DateType, true), StructField("C2", StringType, true)))
58+
StructType(Seq(StructField("c3", DateType, false), StructField("C2", StringType, false)))
7859
}
7960
assert(dataTypeNotSupported.getMessage.contains("DataType datee is not supported"))
8061

8162
val mismatchedInput = intercept[ParseException]{
8263
JdbcUtils.getCustomSchema(tableSchema, "c3 DATE. C2 STRING", caseInsensitive) ===
83-
StructType(Seq(StructField("c3", DateType, true), StructField("C2", StringType, true)))
64+
StructType(Seq(StructField("c3", DateType, false), StructField("C2", StringType, false)))
8465
}
8566
assert(mismatchedInput.getMessage.contains("mismatched input '.' expecting"))
8667
}

sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
2626

2727
import org.apache.spark.{SparkException, SparkFunSuite}
2828
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
29+
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
2930
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
3031
import org.apache.spark.sql.execution.DataSourceScanExec
3132
import org.apache.spark.sql.execution.command.ExplainCommand
@@ -970,30 +971,28 @@ class JDBCSuite extends SparkFunSuite
970971

971972
test("jdbc API support custom schema") {
972973
val parts = Array[String]("THEID < 2", "THEID >= 2")
974+
val customSchema = "NAME STRING, THEID INT"
973975
val props = new Properties()
974-
props.put("customSchema", "NAME STRING, THEID BIGINT")
975-
val schema = StructType(Seq(
976-
StructField("NAME", StringType, true), StructField("THEID", LongType, true)))
976+
props.put("customSchema", customSchema)
977977
val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, props)
978978
assert(df.schema.size === 2)
979-
assert(df.schema === schema)
979+
assert(df.schema === CatalystSqlParser.parseTableSchema(customSchema))
980980
assert(df.count() === 3)
981981
}
982982

983983
test("jdbc API custom schema DDL-like strings.") {
984984
withTempView("people_view") {
985+
val customSchema = "NAME STRING, THEID INT"
985986
sql(
986987
s"""
987988
|CREATE TEMPORARY VIEW people_view
988989
|USING org.apache.spark.sql.jdbc
989990
|OPTIONS (uRl '$url', DbTaBlE 'TEST.PEOPLE', User 'testUser', PassWord 'testPass',
990-
|customSchema 'NAME STRING, THEID INT')
991+
|customSchema '$customSchema')
991992
""".stripMargin.replaceAll("\n", " "))
992-
val schema = StructType(
993-
Seq(StructField("NAME", StringType, true), StructField("THEID", IntegerType, true)))
994993
val df = sql("select * from people_view")
995-
assert(df.schema.size === 2)
996-
assert(df.schema === schema)
994+
assert(df.schema.length === 2)
995+
assert(df.schema === CatalystSqlParser.parseTableSchema(customSchema))
997996
assert(df.count() === 3)
998997
}
999998
}

0 commit comments

Comments
 (0)