Skip to content

Commit 9fc58aa

Browse files
beliefercloud-fan
authored andcommitted
[SPARK-49488][SQL] MySQL dialect supports pushdown datetime functions
### What changes were proposed in this pull request? This PR propose to make MySQL dialect supports pushdown datetime functions. ### Why are the changes needed? Currently, DS V2 pushdown framework pushed the datetime functions with in a common way. But MySQL doesn't support some datetime functions. ### Does this PR introduce _any_ user-facing change? 'No'. This is a new feature for MySQL dialect. ### How was this patch tested? GA. ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes apache#47951 from beliefer/SPARK-49488. Authored-by: beliefer <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 5533c81 commit 9fc58aa

File tree

3 files changed

+114
-3
lines changed

3 files changed

+114
-3
lines changed

connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,19 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest
7777
s"""CREATE TABLE pattern_testing_table (
7878
|pattern_testing_col LONGTEXT
7979
|)
80-
""".stripMargin
80+
|""".stripMargin
8181
).executeUpdate()
82+
connection.prepareStatement(
83+
"CREATE TABLE datetime (name VARCHAR(32), date1 DATE, time1 TIMESTAMP)")
84+
.executeUpdate()
85+
}
86+
87+
override def dataPreparation(connection: Connection): Unit = {
88+
super.dataPreparation(connection)
89+
connection.prepareStatement("INSERT INTO datetime VALUES " +
90+
"('amy', '2022-05-19', '2022-05-19 00:00:00')").executeUpdate()
91+
connection.prepareStatement("INSERT INTO datetime VALUES " +
92+
"('alex', '2022-05-18', '2022-05-18 00:00:00')").executeUpdate()
8293
}
8394

8495
override def testUpdateColumnType(tbl: String): Unit = {
@@ -157,6 +168,79 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest
157168
assert(sql(s"SELECT char_length(c1) from $tableName").head().get(0) === 65536)
158169
}
159170
}
171+
172+
override def testDatetime(tbl: String): Unit = {
173+
val df1 = sql(s"SELECT name FROM $tbl WHERE " +
174+
"dayofyear(date1) > 100 AND dayofmonth(date1) > 10 ")
175+
checkFilterPushed(df1)
176+
val rows1 = df1.collect()
177+
assert(rows1.length === 2)
178+
assert(rows1(0).getString(0) === "amy")
179+
assert(rows1(1).getString(0) === "alex")
180+
181+
val df2 = sql(s"SELECT name FROM $tbl WHERE year(date1) = 2022 AND quarter(date1) = 2")
182+
checkFilterPushed(df2)
183+
val rows2 = df2.collect()
184+
assert(rows2.length === 2)
185+
assert(rows2(0).getString(0) === "amy")
186+
assert(rows2(1).getString(0) === "alex")
187+
188+
val df3 = sql(s"SELECT name FROM $tbl WHERE second(time1) = 0 AND month(date1) = 5")
189+
checkFilterPushed(df3)
190+
val rows3 = df3.collect()
191+
assert(rows3.length === 2)
192+
assert(rows3(0).getString(0) === "amy")
193+
assert(rows3(1).getString(0) === "alex")
194+
195+
val df4 = sql(s"SELECT name FROM $tbl WHERE hour(time1) = 0 AND minute(time1) = 0")
196+
checkFilterPushed(df4)
197+
val rows4 = df4.collect()
198+
assert(rows4.length === 2)
199+
assert(rows4(0).getString(0) === "amy")
200+
assert(rows4(1).getString(0) === "alex")
201+
202+
val df5 = sql(s"SELECT name FROM $tbl WHERE " +
203+
"extract(WEEk from date1) > 10 AND extract(YEAROFWEEK from date1) = 2022")
204+
checkFilterPushed(df5)
205+
val rows5 = df5.collect()
206+
assert(rows5.length === 2)
207+
assert(rows5(0).getString(0) === "amy")
208+
assert(rows5(1).getString(0) === "alex")
209+
210+
val df6 = sql(s"SELECT name FROM $tbl WHERE date_add(date1, 1) = date'2022-05-20' " +
211+
"AND datediff(date1, '2022-05-10') > 0")
212+
checkFilterPushed(df6)
213+
val rows6 = df6.collect()
214+
assert(rows6.length === 1)
215+
assert(rows6(0).getString(0) === "amy")
216+
217+
val df7 = sql(s"SELECT name FROM $tbl WHERE weekday(date1) = 2")
218+
checkFilterPushed(df7)
219+
val rows7 = df7.collect()
220+
assert(rows7.length === 1)
221+
assert(rows7(0).getString(0) === "alex")
222+
223+
val df8 = sql(s"SELECT name FROM $tbl WHERE dayofweek(date1) = 4")
224+
checkFilterPushed(df8)
225+
val rows8 = df8.collect()
226+
assert(rows8.length === 1)
227+
assert(rows8(0).getString(0) === "alex")
228+
229+
val df9 = sql(s"SELECT name FROM $tbl WHERE " +
230+
"dayofyear(date1) > 100 order by dayofyear(date1) limit 1")
231+
checkFilterPushed(df9)
232+
val rows9 = df9.collect()
233+
assert(rows9.length === 1)
234+
assert(rows9(0).getString(0) === "alex")
235+
236+
// MySQL does not support
237+
val df10 = sql(s"SELECT name FROM $tbl WHERE trunc(date1, 'week') = date'2022-05-16'")
238+
checkFilterPushed(df10, false)
239+
val rows10 = df10.collect()
240+
assert(rows10.length === 2)
241+
assert(rows10(0).getString(0) === "amy")
242+
assert(rows10(1).getString(0) === "alex")
243+
}
160244
}
161245

162246
/**

connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
353353
}
354354
}
355355

356-
private def checkFilterPushed(df: DataFrame, pushed: Boolean = true): Unit = {
356+
protected def checkFilterPushed(df: DataFrame, pushed: Boolean = true): Unit = {
357357
val filter = df.queryExecution.optimizedPlan.collect {
358358
case f: Filter => f
359359
}
@@ -980,4 +980,10 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
980980
)
981981
}
982982
}
983+
984+
def testDatetime(tbl: String): Unit = {}
985+
986+
test("scan with filter push-down with date time functions") {
987+
testDatetime(s"$catalogAndNamespace.${caseConvert("datetime")}")
988+
}
983989
}

sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,33 @@ private case class MySQLDialect() extends JdbcDialect with SQLConfHelper with No
4646
// See https://dev.mysql.com/doc/refman/8.0/en/aggregate-functions.html
4747
private val supportedAggregateFunctions =
4848
Set("MAX", "MIN", "SUM", "COUNT", "AVG") ++ distinctUnsupportedAggregateFunctions
49-
private val supportedFunctions = supportedAggregateFunctions
49+
private val supportedFunctions = supportedAggregateFunctions ++ Set("DATE_ADD", "DATE_DIFF")
5050

5151
override def isSupportedFunction(funcName: String): Boolean =
5252
supportedFunctions.contains(funcName)
5353

5454
class MySQLSQLBuilder extends JDBCSQLBuilder {
55+
override def visitExtract(field: String, source: String): String = {
56+
field match {
57+
case "DAY_OF_YEAR" => s"DAYOFYEAR($source)"
58+
case "YEAR_OF_WEEK" => s"EXTRACT(YEAR FROM $source)"
59+
// WEEKDAY uses Monday = 0, Tuesday = 1, ... and ISO standard is Monday = 1, ...,
60+
// so we use the formula (WEEKDAY + 1) to follow the ISO standard.
61+
case "DAY_OF_WEEK" => s"(WEEKDAY($source) + 1)"
62+
case _ => super.visitExtract(field, source)
63+
}
64+
}
65+
66+
override def visitSQLFunction(funcName: String, inputs: Array[String]): String = {
67+
funcName match {
68+
case "DATE_ADD" =>
69+
s"DATE_ADD(${inputs(0)}, INTERVAL ${inputs(1)} DAY)"
70+
case "DATE_DIFF" =>
71+
s"DATEDIFF(${inputs(0)}, ${inputs(1)})"
72+
case _ => super.visitSQLFunction(funcName, inputs)
73+
}
74+
}
75+
5576
override def visitSortOrder(
5677
sortKey: String, sortDirection: SortDirection, nullOrdering: NullOrdering): String = {
5778
(sortDirection, nullOrdering) match {

0 commit comments

Comments
 (0)