Skip to content

Commit 0377338

Browse files
LucaCanaligatorsmile
authored andcommitted
[SPARK-21519][SQL] Add an option to the JDBC data source to initialize the target DB environment
Add an option to the JDBC data source to initialize the environment of the remote database session ## What changes were proposed in this pull request? This proposes an option to the JDBC datasource, tentatively called " sessionInitStatement" to implement the functionality of session initialization present for example in the Sqoop connector for Oracle (see https://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html#_oraoop_oracle_session_initialization_statements ) . After each database session is opened to the remote DB, and before starting to read data, this option executes a custom SQL statement (or a PL/SQL block in the case of Oracle). See also https://issues.apache.org/jira/browse/SPARK-21519 ## How was this patch tested? Manually tested using Spark SQL data source and Oracle JDBC Author: LucaCanali <[email protected]> Closes apache#18724 from LucaCanali/JDBC_datasource_sessionInitStatement.
1 parent 2387f1e commit 0377338

File tree

4 files changed

+56
-0
lines changed

4 files changed

+56
-0
lines changed

docs/sql-programming-guide.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1308,6 +1308,13 @@ the following case-insensitive options:
13081308
</td>
13091309
</tr>
13101310

1311+
<tr>
1312+
<td><code>sessionInitStatement</code></td>
1313+
<td>
1314+
After each database session is opened to the remote DB and before starting to read data, this option executes a custom SQL statement (or a PL/SQL block). Use this to implement session initialization code. Example: <code>option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""")</code>
1315+
</td>
1316+
</tr>
1317+
13111318
<tr>
13121319
<td><code>truncate</code></td>
13131320
<td>

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,8 @@ class JDBCOptions(
138138
case "REPEATABLE_READ" => Connection.TRANSACTION_REPEATABLE_READ
139139
case "SERIALIZABLE" => Connection.TRANSACTION_SERIALIZABLE
140140
}
141+
// An option to execute custom SQL before fetching data from the remote DB
142+
val sessionInitStatement = parameters.get(JDBC_SESSION_INIT_STATEMENT)
141143
}
142144

143145
object JDBCOptions {
@@ -161,4 +163,5 @@ object JDBCOptions {
161163
val JDBC_CREATE_TABLE_COLUMN_TYPES = newOption("createTableColumnTypes")
162164
val JDBC_BATCH_INSERT_SIZE = newOption("batchsize")
163165
val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel")
166+
val JDBC_SESSION_INIT_STATEMENT = newOption("sessionInitStatement")
164167
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,21 @@ private[jdbc] class JDBCRDD(
273273
import scala.collection.JavaConverters._
274274
dialect.beforeFetch(conn, options.asProperties.asScala.toMap)
275275

276+
// This executes a generic SQL statement (or PL/SQL block) before reading
277+
// the table/query via JDBC. Use this feature to initialize the database
278+
// session environment, e.g. for optimizations and/or troubleshooting.
279+
options.sessionInitStatement match {
280+
case Some(sql) =>
281+
val statement = conn.prepareStatement(sql)
282+
logInfo(s"Executing sessionInitStatement: $sql")
283+
try {
284+
statement.execute()
285+
} finally {
286+
statement.close()
287+
}
288+
case None =>
289+
}
290+
276291
// H2's JDBC driver does not support the setSchema() method. We pass a
277292
// fully-qualified table name in the SELECT statement. I don't know how to
278293
// talk about a table in a completely portable way.

sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1044,4 +1044,35 @@ class JDBCSuite extends SparkFunSuite
10441044
assert(sql("select * from people_view").count() == 3)
10451045
}
10461046
}
1047+
1048+
test("SPARK-21519: option sessionInitStatement, run SQL to initialize the database session.") {
1049+
val initSQL1 = "SET @MYTESTVAR 21519"
1050+
val df1 = spark.read.format("jdbc")
1051+
.option("url", urlWithUserAndPass)
1052+
.option("dbtable", "(SELECT NVL(@MYTESTVAR, -1))")
1053+
.option("sessionInitStatement", initSQL1)
1054+
.load()
1055+
assert(df1.collect() === Array(Row(21519)))
1056+
1057+
val initSQL2 = "SET SCHEMA DUMMY"
1058+
val df2 = spark.read.format("jdbc")
1059+
.option("url", urlWithUserAndPass)
1060+
.option("dbtable", "TEST.PEOPLE")
1061+
.option("sessionInitStatement", initSQL2)
1062+
.load()
1063+
val e = intercept[SparkException] {df2.collect()}.getMessage
1064+
assert(e.contains("""Schema "DUMMY" not found"""))
1065+
1066+
sql(
1067+
s"""
1068+
|CREATE OR REPLACE TEMPORARY VIEW test_sessionInitStatement
1069+
|USING org.apache.spark.sql.jdbc
1070+
|OPTIONS (url '$urlWithUserAndPass',
1071+
|dbtable '(SELECT NVL(@MYTESTVAR1, -1), NVL(@MYTESTVAR2, -1))',
1072+
|sessionInitStatement 'SET @MYTESTVAR1 21519; SET @MYTESTVAR2 1234')
1073+
""".stripMargin)
1074+
1075+
val df3 = sql("SELECT * FROM test_sessionInitStatement")
1076+
assert(df3.collect() === Array(Row(21519, 1234)))
1077+
}
10471078
}

0 commit comments

Comments
 (0)