Skip to content

Commit 32d44b1

Browse files
shivsooddongjoon-hyun
authored andcommitted
[SPARK-29644][SQL] Corrected ShortType and ByteType mapping to SmallInt and TinyInt in JDBCUtils
### What changes were proposed in this pull request? Corrected ShortType and ByteType mapping to SmallInt and TinyInt, corrected setter methods to set ShortType and ByteType as setShort() and setByte(). Changes in JDBCUtils.scala Fixed Unit test cases to where applicable and added new E2E test cases in to test table read/write using ShortType and ByteType. #### Problems - In master in JDBCUtils.scala line number 547 and 551 have a problem where ShortType and ByteType are set as Integers rather than set as Short and Byte respectively. ``` case ShortType => (stmt: PreparedStatement, row: Row, pos: Int) => stmt.setInt(pos + 1, row.getShort(pos)) The issue was pointed out by maropu case ByteType => (stmt: PreparedStatement, row: Row, pos: Int) => stmt.setInt(pos + 1, row.getByte(pos)) ``` - Also at line JDBCUtils.scala 247 TinyInt is interpreted wrongly as IntergetType in getCatalystType() ``` case java.sql.Types.TINYINT => IntegerType ``` - At line 172 ShortType was wrongly interpreted as IntegerType ``` case ShortType => Option(JdbcType("INTEGER", java.sql.Types.SMALLINT)) ``` - All thru out tests, ShortType and ByteType were being interpreted as IntegerTypes. ### Why are the changes needed? A given type should be set using the right type. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Corrected Unit test cases where applicable. Validated in CI/CD Added a test case in MsSqlServerIntegrationSuite.scala, PostgresIntegrationSuite.scala , MySQLIntegrationSuite.scala to write/read tables from dataframe with cols as shorttype and bytetype. Validated by manual as follows. ``` ./build/mvn install -DskipTests ./build/mvn test -Pdocker-integration-tests -pl :spark-docker-integration-tests_2.12 ``` Closes apache#26301 from shivsood/shorttype_fix_maropu. Authored-by: shivsood <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 15a72f3 commit 32d44b1

File tree

5 files changed

+97
-13
lines changed

5 files changed

+97
-13
lines changed

external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite {
5959
"""
6060
|INSERT INTO numbers VALUES (
6161
|0,
62-
|255, 32767, 2147483647, 9223372036854775807,
62+
|127, 32767, 2147483647, 9223372036854775807,
6363
|123456789012345.123456789012345, 123456789012345.123456789012345,
6464
|123456789012345.123456789012345,
6565
|123, 12345.12,
@@ -119,7 +119,7 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite {
119119
val types = row.toSeq.map(x => x.getClass.toString)
120120
assert(types.length == 12)
121121
assert(types(0).equals("class java.lang.Boolean"))
122-
assert(types(1).equals("class java.lang.Integer"))
122+
assert(types(1).equals("class java.lang.Byte"))
123123
assert(types(2).equals("class java.lang.Short"))
124124
assert(types(3).equals("class java.lang.Integer"))
125125
assert(types(4).equals("class java.lang.Long"))
@@ -131,7 +131,7 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite {
131131
assert(types(10).equals("class java.math.BigDecimal"))
132132
assert(types(11).equals("class java.math.BigDecimal"))
133133
assert(row.getBoolean(0) == false)
134-
assert(row.getInt(1) == 255)
134+
assert(row.getByte(1) == 127)
135135
assert(row.getShort(2) == 32767)
136136
assert(row.getInt(3) == 2147483647)
137137
assert(row.getLong(4) == 9223372036854775807L)
@@ -202,4 +202,46 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite {
202202
df2.write.jdbc(jdbcUrl, "datescopy", new Properties)
203203
df3.write.jdbc(jdbcUrl, "stringscopy", new Properties)
204204
}
205+
206+
test("SPARK-29644: Write tables with ShortType") {
207+
import testImplicits._
208+
val df = Seq(-32768.toShort, 0.toShort, 1.toShort, 38.toShort, 32768.toShort).toDF("a")
209+
val tablename = "shorttable"
210+
df.write
211+
.format("jdbc")
212+
.mode("overwrite")
213+
.option("url", jdbcUrl)
214+
.option("dbtable", tablename)
215+
.save()
216+
val df2 = spark.read
217+
.format("jdbc")
218+
.option("url", jdbcUrl)
219+
.option("dbtable", tablename)
220+
.load()
221+
assert(df.count == df2.count)
222+
val rows = df2.collect()
223+
val colType = rows(0).toSeq.map(x => x.getClass.toString)
224+
assert(colType(0) == "class java.lang.Short")
225+
}
226+
227+
test("SPARK-29644: Write tables with ByteType") {
228+
import testImplicits._
229+
val df = Seq(-127.toByte, 0.toByte, 1.toByte, 38.toByte, 128.toByte).toDF("a")
230+
val tablename = "bytetable"
231+
df.write
232+
.format("jdbc")
233+
.mode("overwrite")
234+
.option("url", jdbcUrl)
235+
.option("dbtable", tablename)
236+
.save()
237+
val df2 = spark.read
238+
.format("jdbc")
239+
.option("url", jdbcUrl)
240+
.option("dbtable", tablename)
241+
.load()
242+
assert(df.count == df2.count)
243+
val rows = df2.collect()
244+
val colType = rows(0).toSeq.map(x => x.getClass.toString)
245+
assert(colType(0) == "class java.lang.Byte")
246+
}
205247
}

external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
8484
assert(types.length == 9)
8585
assert(types(0).equals("class java.lang.Boolean"))
8686
assert(types(1).equals("class java.lang.Long"))
87-
assert(types(2).equals("class java.lang.Integer"))
87+
assert(types(2).equals("class java.lang.Short"))
8888
assert(types(3).equals("class java.lang.Integer"))
8989
assert(types(4).equals("class java.lang.Integer"))
9090
assert(types(5).equals("class java.lang.Long"))
@@ -93,7 +93,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
9393
assert(types(8).equals("class java.lang.Double"))
9494
assert(rows(0).getBoolean(0) == false)
9595
assert(rows(0).getLong(1) == 0x225)
96-
assert(rows(0).getInt(2) == 17)
96+
assert(rows(0).getShort(2) == 17)
9797
assert(rows(0).getInt(3) == 77777)
9898
assert(rows(0).getInt(4) == 123456789)
9999
assert(rows(0).getLong(5) == 123456789012345L)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -170,8 +170,8 @@ object JdbcUtils extends Logging {
170170
case LongType => Option(JdbcType("BIGINT", java.sql.Types.BIGINT))
171171
case DoubleType => Option(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE))
172172
case FloatType => Option(JdbcType("REAL", java.sql.Types.FLOAT))
173-
case ShortType => Option(JdbcType("INTEGER", java.sql.Types.SMALLINT))
174-
case ByteType => Option(JdbcType("BYTE", java.sql.Types.TINYINT))
173+
case ShortType => Option(JdbcType("SMALLINT", java.sql.Types.SMALLINT))
174+
case ByteType => Option(JdbcType("TINYINT", java.sql.Types.TINYINT))
175175
case BooleanType => Option(JdbcType("BIT(1)", java.sql.Types.BIT))
176176
case StringType => Option(JdbcType("TEXT", java.sql.Types.CLOB))
177177
case BinaryType => Option(JdbcType("BLOB", java.sql.Types.BLOB))
@@ -235,7 +235,7 @@ object JdbcUtils extends Logging {
235235
case java.sql.Types.REF => StringType
236236
case java.sql.Types.REF_CURSOR => null
237237
case java.sql.Types.ROWID => LongType
238-
case java.sql.Types.SMALLINT => IntegerType
238+
case java.sql.Types.SMALLINT => ShortType
239239
case java.sql.Types.SQLXML => StringType
240240
case java.sql.Types.STRUCT => StringType
241241
case java.sql.Types.TIME => TimestampType
@@ -244,7 +244,7 @@ object JdbcUtils extends Logging {
244244
case java.sql.Types.TIMESTAMP => TimestampType
245245
case java.sql.Types.TIMESTAMP_WITH_TIMEZONE
246246
=> null
247-
case java.sql.Types.TINYINT => IntegerType
247+
case java.sql.Types.TINYINT => ByteType
248248
case java.sql.Types.VARBINARY => BinaryType
249249
case java.sql.Types.VARCHAR => StringType
250250
case _ =>
@@ -546,11 +546,11 @@ object JdbcUtils extends Logging {
546546

547547
case ShortType =>
548548
(stmt: PreparedStatement, row: Row, pos: Int) =>
549-
stmt.setInt(pos + 1, row.getShort(pos))
549+
stmt.setShort(pos + 1, row.getShort(pos))
550550

551551
case ByteType =>
552552
(stmt: PreparedStatement, row: Row, pos: Int) =>
553-
stmt.setInt(pos + 1, row.getByte(pos))
553+
stmt.setByte(pos + 1, row.getByte(pos))
554554

555555
case BooleanType =>
556556
(stmt: PreparedStatement, row: Row, pos: Int) =>

sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -578,8 +578,8 @@ class JDBCSuite extends QueryTest
578578
assert(rows.length === 1)
579579
assert(rows(0).getInt(0) === 1)
580580
assert(rows(0).getBoolean(1) === false)
581-
assert(rows(0).getInt(2) === 3)
582-
assert(rows(0).getInt(3) === 4)
581+
assert(rows(0).getByte(2) === 3.toByte)
582+
assert(rows(0).getShort(3) === 4.toShort)
583583
assert(rows(0).getLong(4) === 1234567890123L)
584584
}
585585

sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,48 @@ class JDBCWriteSuite extends SharedSparkSession with BeforeAndAfter {
574574
}
575575
}
576576

577+
test("SPARK-29644: Write tables with ShortType") {
578+
import testImplicits._
579+
val df = Seq(-32768.toShort, 0.toShort, 1.toShort, 38.toShort, 32768.toShort).toDF("a")
580+
val tablename = "shorttable"
581+
df.write
582+
.format("jdbc")
583+
.mode("overwrite")
584+
.option("url", url)
585+
.option("dbtable", tablename)
586+
.save()
587+
val df2 = spark.read
588+
.format("jdbc")
589+
.option("url", url)
590+
.option("dbtable", tablename)
591+
.load()
592+
assert(df.count == df2.count)
593+
val rows = df2.collect()
594+
val colType = rows(0).toSeq.map(x => x.getClass.toString)
595+
assert(colType(0) == "class java.lang.Short")
596+
}
597+
598+
test("SPARK-29644: Write tables with ByteType") {
599+
import testImplicits._
600+
val df = Seq(-127.toByte, 0.toByte, 1.toByte, 38.toByte, 128.toByte).toDF("a")
601+
val tablename = "bytetable"
602+
df.write
603+
.format("jdbc")
604+
.mode("overwrite")
605+
.option("url", url)
606+
.option("dbtable", tablename)
607+
.save()
608+
val df2 = spark.read
609+
.format("jdbc")
610+
.option("url", url)
611+
.option("dbtable", tablename)
612+
.load()
613+
assert(df.count == df2.count)
614+
val rows = df2.collect()
615+
val colType = rows(0).toSeq.map(x => x.getClass.toString)
616+
assert(colType(0) == "class java.lang.Byte")
617+
}
618+
577619
private def runAndVerifyRecordsWritten(expected: Long)(job: => Unit): Unit = {
578620
assert(expected === runAndReturnMetrics(job, _.taskMetrics.outputMetrics.recordsWritten))
579621
}

0 commit comments

Comments
 (0)