Skip to content

Commit 21fcac1

Browse files
maryannxuegatorsmile
authored andcommitted
[SPARK-24288][SQL] Add a JDBC Option to enable preventing predicate pushdown
## What changes were proposed in this pull request? Add a JDBC Option "pushDownPredicate" (default `true`) to allow/disallow predicate push-down in JDBC data source. ## How was this patch tested? Add a test in `JDBCSuite` Author: maryannxue <[email protected]> Closes apache#21875 from maryannxue/spark-24288.
1 parent e6e9031 commit 21fcac1

File tree

4 files changed

+63
-20
lines changed

4 files changed

+63
-20
lines changed

docs/sql-programming-guide.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1435,6 +1435,13 @@ the following case-insensitive options:
14351435
The custom schema to use for reading data from JDBC connectors. For example, <code>"id DECIMAL(38, 0), name STRING"</code>. You can also specify partial fields, and the others use the default type mapping. For example, <code>"id DECIMAL(38, 0)"</code>. The column names should be identical to the corresponding column names of JDBC table. Users can specify the corresponding data types of Spark SQL instead of using the defaults. This option applies only to reading.
14361436
</td>
14371437
</tr>
1438+
1439+
<tr>
1440+
<td><code>pushDownPredicate</code></td>
1441+
<td>
1442+
The option to enable or disable predicate push-down into the JDBC data source. The default value is true, in which case Spark will push down filters to the JDBC data source as much as possible. Otherwise, if set to false, no filter will be pushed down to the JDBC data source and thus all filters will be handled by Spark. Predicate push-down is usually turned off when the predicate filtering is performed faster by Spark than by the JDBC data source.
1443+
</td>
1444+
</tr>
14381445
</table>
14391446

14401447
<div class="codetabs">

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,9 @@ class JDBCOptions(
183183
}
184184
// An option to execute custom SQL before fetching data from the remote DB
185185
val sessionInitStatement = parameters.get(JDBC_SESSION_INIT_STATEMENT)
186+
187+
// An option to allow/disallow pushing down predicate into JDBC data source
188+
val pushDownPredicate = parameters.getOrElse(JDBC_PUSHDOWN_PREDICATE, "true").toBoolean
186189
}
187190

188191
class JdbcOptionsInWrite(
@@ -234,4 +237,5 @@ object JDBCOptions {
234237
val JDBC_BATCH_INSERT_SIZE = newOption("batchsize")
235238
val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel")
236239
val JDBC_SESSION_INIT_STATEMENT = newOption("sessionInitStatement")
240+
val JDBC_PUSHDOWN_PREDICATE = newOption("pushDownPredicate")
237241
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,11 @@ private[sql] case class JDBCRelation(
172172

173173
// Check if JDBCRDD.compileFilter can accept input filters
174174
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
175-
filters.filter(JDBCRDD.compileFilter(_, JdbcDialects.get(jdbcOptions.url)).isEmpty)
175+
if (jdbcOptions.pushDownPredicate) {
176+
filters.filter(JDBCRDD.compileFilter(_, JdbcDialects.get(jdbcOptions.url)).isEmpty)
177+
} else {
178+
filters
179+
}
176180
}
177181

178182
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {

sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala

Lines changed: 47 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -261,21 +261,32 @@ class JDBCSuite extends QueryTest
261261
s"Expecting a JDBCRelation with $expectedNumPartitions partitions, but got:`$jdbcRelations`")
262262
}
263263

264+
private def checkPushdown(df: DataFrame): DataFrame = {
265+
val parentPlan = df.queryExecution.executedPlan
266+
// Check if SparkPlan Filter is removed in a physical plan and
267+
// the plan only has PhysicalRDD to scan JDBCRelation.
268+
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
269+
val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
270+
assert(node.child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScanExec])
271+
assert(node.child.asInstanceOf[DataSourceScanExec].nodeName.contains("JDBCRelation"))
272+
df
273+
}
274+
275+
private def checkNotPushdown(df: DataFrame): DataFrame = {
276+
val parentPlan = df.queryExecution.executedPlan
277+
// Check if SparkPlan Filter is not removed in a physical plan because JDBCRDD
278+
// cannot compile given predicates.
279+
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
280+
val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
281+
assert(node.child.isInstanceOf[org.apache.spark.sql.execution.FilterExec])
282+
df
283+
}
284+
264285
test("SELECT *") {
265286
assert(sql("SELECT * FROM foobar").collect().size === 3)
266287
}
267288

268289
test("SELECT * WHERE (simple predicates)") {
269-
def checkPushdown(df: DataFrame): DataFrame = {
270-
val parentPlan = df.queryExecution.executedPlan
271-
// Check if SparkPlan Filter is removed in a physical plan and
272-
// the plan only has PhysicalRDD to scan JDBCRelation.
273-
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
274-
val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
275-
assert(node.child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScanExec])
276-
assert(node.child.asInstanceOf[DataSourceScanExec].nodeName.contains("JDBCRelation"))
277-
df
278-
}
279290
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0)
280291
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2)
281292
assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size == 1)
@@ -308,15 +319,6 @@ class JDBCSuite extends QueryTest
308319
"WHERE (THEID > 0 AND TRIM(NAME) = 'mary') OR (NAME = 'fred')")
309320
assert(df2.collect.toSet === Set(Row("fred", 1), Row("mary", 2)))
310321

311-
def checkNotPushdown(df: DataFrame): DataFrame = {
312-
val parentPlan = df.queryExecution.executedPlan
313-
// Check if SparkPlan Filter is not removed in a physical plan because JDBCRDD
314-
// cannot compile given predicates.
315-
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
316-
val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
317-
assert(node.child.isInstanceOf[org.apache.spark.sql.execution.FilterExec])
318-
df
319-
}
320322
assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 1) < 2")).collect().size == 0)
321323
assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 2) != 4")).collect().size == 2)
322324
}
@@ -1375,4 +1377,30 @@ class JDBCSuite extends QueryTest
13751377
Row("fred", 1) :: Nil)
13761378

13771379
}
1380+
1381+
test("SPARK-24288: Enable preventing predicate pushdown") {
1382+
val table = "test.people"
1383+
1384+
val df = spark.read.format("jdbc")
1385+
.option("Url", urlWithUserAndPass)
1386+
.option("dbTable", table)
1387+
.option("pushDownPredicate", false)
1388+
.load()
1389+
.filter("theid = 1")
1390+
.select("name", "theid")
1391+
checkAnswer(
1392+
checkNotPushdown(df),
1393+
Row("fred", 1) :: Nil)
1394+
1395+
// pushDownPredicate option in the create table path.
1396+
sql(
1397+
s"""
1398+
|CREATE OR REPLACE TEMPORARY VIEW predicateOption
1399+
|USING org.apache.spark.sql.jdbc
1400+
|OPTIONS (url '$urlWithUserAndPass', dbTable '$table', pushDownPredicate 'false')
1401+
""".stripMargin.replaceAll("\n", " "))
1402+
checkAnswer(
1403+
checkNotPushdown(sql("SELECT name, theid FROM predicateOption WHERE theid = 1")),
1404+
Row("fred", 1) :: Nil)
1405+
}
13781406
}

0 commit comments

Comments
 (0)