Skip to content

Commit 7d20764

Browse files
Merge pull request #205169 from sreekzz/docs-editor/apache-spark-connect-to-sql-da-1658227918
Image Updates
2 parents 4417c0c + b6103bc commit 7d20764

File tree

4 files changed

+131
-135
lines changed

4 files changed

+131
-135
lines changed

articles/hdinsight/spark/apache-spark-connect-to-sql-database.md

Lines changed: 131 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@ Learn how to connect an Apache Spark cluster in Azure HDInsight with Azure SQL D
1414
## Prerequisites
1515

1616
* Azure HDInsight Spark cluster. Follow the instructions at [Create an Apache Spark cluster in HDInsight](apache-spark-jupyter-spark-sql.md).
17-
1817
* Azure SQL Database. Follow the instructions at [Create a database in Azure SQL Database](/azure/azure-sql/database/single-database-create-quickstart). Make sure you create a database with the sample **AdventureWorksLT** schema and data. Also, make sure you create a server-level firewall rule to allow your client's IP address to access the SQL database. The instructions to add the firewall rule is available in the same article. Once you've created your SQL database, make sure you keep the following values handy. You need them to connect to the database from a Spark cluster.
1918

20-
* Server name.
21-
* Database name.
22-
* Azure SQL Database admin user name / password.
19+
* Server name.
20+
* Database name.
21+
* Azure SQL Database admin user name / password.
22+
2323

2424
* SQL Server Management Studio (SSMS). Follow the instructions at [Use SSMS to connect and query data](/azure/azure-sql/database/connect-query-ssms).
2525

@@ -30,133 +30,130 @@ Start by creating a Jupyter Notebook associated with the Spark cluster. You use
3030
1. From the [Azure portal](https://portal.azure.com/), open your cluster.
3131
1. Select **Jupyter Notebook** underneath **Cluster dashboards** on the right side. If you don't see **Cluster dashboards**, select **Overview** from the left menu. If prompted, enter the admin credentials for the cluster.
3232

33-
:::image type="content" source="./media/apache-spark-connect-to-sql-database/hdinsight-spark-cluster-dashboard-jupyter-notebook.png " alt-text="Jupyter Notebook on Apache Spark" border="true":::
33+
:::image type="content" source="./media/apache-spark-connect-to-sql-database/new-hdinsight-spark-cluster-dashboard-jupyter-notebook.png " alt-text="Jupyter Notebook on Apache Spark" border="true":::
3434

35-
> [!NOTE]
35+
> [!NOTE]
3636
> You can also access the Jupyter Notebook on Spark cluster by opening the following URL in your browser. Replace **CLUSTERNAME** with the name of your cluster:
37-
>
37+
>
3838
> `https://CLUSTERNAME.azurehdinsight.net/jupyter`
39-
4039
1. In the Jupyter Notebook, from the top-right corner, click **New**, and then click **Spark** to create a Scala notebook. Jupyter Notebooks on HDInsight Spark cluster also provide the **PySpark** kernel for Python2 applications, and the **PySpark3** kernel for Python3 applications. For this article, we create a Scala notebook.
4140

42-
:::image type="content" source="./media/apache-spark-connect-to-sql-database/kernel-jupyter-notebook-on-spark.png " alt-text="Kernels for Jupyter Notebook on Spark" border="true":::
41+
:::image type="content" source="./media/apache-spark-connect-to-sql-database/new-kernel-jupyter-notebook-on-spark.png " alt-text="Kernels for Jupyter Notebook on Spark" border="true":::
4342

44-
For more information about the kernels, see [Use Jupyter Notebook kernels with Apache Spark clusters in HDInsight](apache-spark-jupyter-notebook-kernels.md).
43+
For more information about the kernels, see [Use Jupyter Notebook kernels with Apache Spark clusters in HDInsight](apache-spark-jupyter-notebook-kernels.md).
4544

46-
> [!NOTE]
47-
> In this article, we use a Spark (Scala) kernel because streaming data from Spark into SQL Database is only supported in Scala and Java currently. Even though reading from and writing into SQL can be done using Python, for consistency in this article, we use Scala for all three operations.
45+
> [!NOTE]
46+
> In this article, we use a Spark (Scala) kernel because streaming data from Spark into SQL Database is only supported in Scala and Java currently. Even though reading from and writing into SQL can be done using Python, for consistency in this article, we use Scala for all three operations.
4847
4948
1. A new notebook opens with a default name, **Untitled**. Click the notebook name and enter a name of your choice.
5049

51-
:::image type="content" source="./media/apache-spark-connect-to-sql-database/hdinsight-spark-jupyter-notebook-name.png " alt-text="Provide a name for the notebook" border="true":::
50+
:::image type="content" source="./media/apache-spark-connect-to-sql-database/new-hdinsight-spark-jupyter-notebook-name.png " alt-text="Provide a name for the notebook" border="true":::
5251

53-
You can now start creating your application.
52+
You can now start creating your application.
5453

5554
## Read data from Azure SQL Database
5655

5756
In this section, you read data from a table (for example, **SalesLT.Address**) that exists in the AdventureWorks database.
5857

5958
1. In a new Jupyter Notebook, in a code cell, paste the following snippet and replace the placeholder values with the values for your database.
6059

61-
```scala
62-
// Declare the values for your database
63-
64-
val jdbcUsername = "<SQL DB ADMIN USER>"
65-
val jdbcPassword = "<SQL DB ADMIN PWD>"
66-
val jdbcHostname = "<SQL SERVER NAME HOSTING SDL DB>" //typically, this is in the form or servername.database.windows.net
67-
val jdbcPort = 1433
68-
val jdbcDatabase ="<AZURE SQL DB NAME>"
69-
```
60+
```scala
61+
// Declare the values for your database
62+
63+
val jdbcUsername = "<SQL DB ADMIN USER>"
64+
val jdbcPassword = "<SQL DB ADMIN PWD>"
65+
val jdbcHostname = "<SQL SERVER NAME HOSTING SDL DB>" //typically, this is in the form or servername.database.windows.net
66+
val jdbcPort = 1433
67+
val jdbcDatabase ="<AZURE SQL DB NAME>"
68+
```
7069

7170
Press **SHIFT + ENTER** to run the code cell.
72-
7371
1. Use the snippet below to build a JDBC URL that you can pass to the Spark dataframe APIs. The code creates a `Properties` object to hold the parameters. Paste the snippet in a code cell and press **SHIFT + ENTER** to run.
7472

75-
```scala
76-
import java.util.Properties
77-
78-
val jdbc_url = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=60;"
79-
val connectionProperties = new Properties()
80-
connectionProperties.put("user", s"${jdbcUsername}")
81-
connectionProperties.put("password", s"${jdbcPassword}")
82-
```
73+
```scala
74+
import java.util.Properties
75+
76+
val jdbc_url = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=60;"
77+
val connectionProperties = new Properties()
78+
connectionProperties.put("user", s"${jdbcUsername}")
79+
connectionProperties.put("password", s"${jdbcPassword}")
80+
```
8381

8482
1. Use the snippet below to create a dataframe with the data from a table in your database. In this snippet, we use a `SalesLT.Address` table that is available as part of the **AdventureWorksLT** database. Paste the snippet in a code cell and press **SHIFT + ENTER** to run.
8583

86-
```scala
87-
val sqlTableDF = spark.read.jdbc(jdbc_url, "SalesLT.Address", connectionProperties)
88-
```
84+
```scala
85+
val sqlTableDF = spark.read.jdbc(jdbc_url, "SalesLT.Address", connectionProperties)
86+
```
8987

9088
1. You can now do operations on the dataframe, such as getting the data schema:
9189

92-
```scala
93-
sqlTableDF.printSchema
94-
```
90+
```scala
91+
sqlTableDF.printSchema
92+
```
9593

9694
You see an output similar to the following image:
9795

9896
:::image type="content" source="./media/apache-spark-connect-to-sql-database/read-from-sql-schema-output.png " alt-text="schema output" border="true":::
9997

10098
1. You can also do operations like, retrieve the top 10 rows.
10199

102-
```scala
103-
sqlTableDF.show(10)
104-
```
100+
```scala
101+
sqlTableDF.show(10)
102+
```
105103

106104
1. Or, retrieve specific columns from the dataset.
107105

108-
```scala
109-
sqlTableDF.select("AddressLine1", "City").show(10)
110-
```
106+
```scala
107+
sqlTableDF.select("AddressLine1", "City").show(10)
108+
```
111109

112110
## Write data into Azure SQL Database
113111

114112
In this section, we use a sample CSV file available on the cluster to create a table in your database and populate it with data. The sample CSV file (**HVAC.csv**) is available on all HDInsight clusters at `HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv`.
115113

116114
1. In a new Jupyter Notebook, in a code cell, paste the following snippet and replace the placeholder values with the values for your database.
117115

118-
```scala
119-
// Declare the values for your database
120-
121-
val jdbcUsername = "<SQL DB ADMIN USER>"
122-
val jdbcPassword = "<SQL DB ADMIN PWD>"
123-
val jdbcHostname = "<SQL SERVER NAME HOSTING SDL DB>" //typically, this is in the form or servername.database.windows.net
124-
val jdbcPort = 1433
125-
val jdbcDatabase ="<AZURE SQL DB NAME>"
126-
```
116+
```scala
117+
// Declare the values for your database
118+
119+
val jdbcUsername = "<SQL DB ADMIN USER>"
120+
val jdbcPassword = "<SQL DB ADMIN PWD>"
121+
val jdbcHostname = "<SQL SERVER NAME HOSTING SDL DB>" //typically, this is in the form or servername.database.windows.net
122+
val jdbcPort = 1433
123+
val jdbcDatabase ="<AZURE SQL DB NAME>"
124+
```
127125

128126
Press **SHIFT + ENTER** to run the code cell.
129-
130127
1. The following snippet builds a JDBC URL that you can pass to the Spark dataframe APIs. The code creates a `Properties` object to hold the parameters. Paste the snippet in a code cell and press **SHIFT + ENTER** to run.
131128

132-
```scala
133-
import java.util.Properties
134-
135-
val jdbc_url = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=60;"
136-
val connectionProperties = new Properties()
137-
connectionProperties.put("user", s"${jdbcUsername}")
138-
connectionProperties.put("password", s"${jdbcPassword}")
139-
```
129+
```scala
130+
import java.util.Properties
131+
132+
val jdbc_url = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=60;"
133+
val connectionProperties = new Properties()
134+
connectionProperties.put("user", s"${jdbcUsername}")
135+
connectionProperties.put("password", s"${jdbcPassword}")
136+
```
140137

141138
1. Use the following snippet to extract the schema of the data in HVAC.csv and use the schema to load the data from the CSV in a dataframe, `readDf`. Paste the snippet in a code cell and press **SHIFT + ENTER** to run.
142139

143-
```scala
144-
val userSchema = spark.read.option("header", "true").csv("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv").schema
145-
val readDf = spark.read.format("csv").schema(userSchema).load("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")
146-
```
140+
```scala
141+
val userSchema = spark.read.option("header", "true").csv("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv").schema
142+
val readDf = spark.read.format("csv").schema(userSchema).load("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")
143+
```
147144

148145
1. Use the `readDf` dataframe to create a temporary table, `temphvactable`. Then use the temporary table to create a hive table, `hvactable_hive`.
149146

150-
```scala
151-
readDf.createOrReplaceTempView("temphvactable")
152-
spark.sql("create table hvactable_hive as select * from temphvactable")
153-
```
147+
```scala
148+
readDf.createOrReplaceTempView("temphvactable")
149+
spark.sql("create table hvactable_hive as select * from temphvactable")
150+
```
154151

155152
1. Finally, use the hive table to create a table in your database. The following snippet creates `hvactable` in Azure SQL Database.
156153

157-
```scala
158-
spark.table("hvactable_hive").write.jdbc(jdbc_url, "hvactable", connectionProperties)
159-
```
154+
```scala
155+
spark.table("hvactable_hive").write.jdbc(jdbc_url, "hvactable", connectionProperties)
156+
```
160157

161158
1. Connect to the Azure SQL Database using SSMS and verify that you see a `dbo.hvactable` there.
162159

@@ -170,90 +167,89 @@ In this section, we use a sample CSV file available on the cluster to create a t
170167

171168
1. Run a query in SSMS to see the columns in the table.
172169

173-
```sql
174-
SELECT * from hvactable
175-
```
170+
```sql
171+
SELECT * from hvactable
172+
```
176173

177174
## Stream data into Azure SQL Database
178175

179176
In this section, we stream data into the `hvactable` that you created in the previous section.
180177

181178
1. As a first step, make sure there are no records in the `hvactable`. Using SSMS, run the following query on the table.
182179

183-
```sql
184-
TRUNCATE TABLE [dbo].[hvactable]
185-
```
186-
180+
```sql
181+
TRUNCATE TABLE [dbo].[hvactable]
182+
```
187183
1. Create a new Jupyter Notebook on the HDInsight Spark cluster. In a code cell, paste the following snippet and then press **SHIFT + ENTER**:
188184

189-
```scala
190-
import org.apache.spark.sql._
191-
import org.apache.spark.sql.types._
192-
import org.apache.spark.sql.functions._
193-
import org.apache.spark.sql.streaming._
194-
import java.sql.{Connection,DriverManager,ResultSet}
195-
```
185+
```scala
186+
import org.apache.spark.sql._
187+
import org.apache.spark.sql.types._
188+
import org.apache.spark.sql.functions._
189+
import org.apache.spark.sql.streaming._
190+
import java.sql.{Connection,DriverManager,ResultSet}
191+
```
196192

197193
1. We stream data from the **HVAC.csv** into the `hvactable`. HVAC.csv file is available on the cluster at `/HdiSamples/HdiSamples/SensorSampleData/HVAC/`. In the following snippet, we first get the schema of the data to be streamed. Then, we create a streaming dataframe using that schema. Paste the snippet in a code cell and press **SHIFT + ENTER** to run.
198194

199-
```scala
200-
val userSchema = spark.read.option("header", "true").csv("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv").schema
201-
val readStreamDf = spark.readStream.schema(userSchema).csv("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/")
202-
readStreamDf.printSchema
203-
```
195+
```scala
196+
val userSchema = spark.read.option("header", "true").csv("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv").schema
197+
val readStreamDf = spark.readStream.schema(userSchema).csv("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/")
198+
readStreamDf.printSchema
199+
```
204200

205201
1. The output shows the schema of **HVAC.csv**. The `hvactable` has the same schema as well. The output lists the columns in the table.
206202

207203
:::image type="content" source="./media/apache-spark-connect-to-sql-database/hdinsight-schema-table.png " alt-text="`hdinsight Apache Spark schema table`" border="true":::
208204

209205
1. Finally, use the following snippet to read data from the HVAC.csv and stream it into the `hvactable` in your database. Paste the snippet in a code cell, replace the placeholder values with the values for your database, and then press **SHIFT + ENTER** to run.
210206

211-
```scala
212-
val WriteToSQLQuery = readStreamDf.writeStream.foreach(new ForeachWriter[Row] {
213-
var connection:java.sql.Connection = _
214-
var statement:java.sql.Statement = _
215-
216-
val jdbcUsername = "<SQL DB ADMIN USER>"
217-
val jdbcPassword = "<SQL DB ADMIN PWD>"
218-
val jdbcHostname = "<SQL SERVER NAME HOSTING SDL DB>" //typically, this is in the form or servername.database.windows.net
219-
val jdbcPort = 1433
220-
val jdbcDatabase ="<AZURE SQL DB NAME>"
221-
val driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
222-
val jdbc_url = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
223-
224-
def open(partitionId: Long, version: Long):Boolean = {
225-
Class.forName(driver)
226-
connection = DriverManager.getConnection(jdbc_url, jdbcUsername, jdbcPassword)
227-
statement = connection.createStatement
228-
true
229-
}
230-
231-
def process(value: Row): Unit = {
232-
val Date = value(0)
233-
val Time = value(1)
234-
val TargetTemp = value(2)
235-
val ActualTemp = value(3)
236-
val System = value(4)
237-
val SystemAge = value(5)
238-
val BuildingID = value(6)
239-
240-
val valueStr = "'" + Date + "'," + "'" + Time + "'," + "'" + TargetTemp + "'," + "'" + ActualTemp + "'," + "'" + System + "'," + "'" + SystemAge + "'," + "'" + BuildingID + "'"
241-
statement.execute("INSERT INTO " + "dbo.hvactable" + " VALUES (" + valueStr + ")")
242-
}
243-
244-
def close(errorOrNull: Throwable): Unit = {
245-
connection.close
246-
}
247-
})
248-
249-
var streamingQuery = WriteToSQLQuery.start()
250-
```
207+
```scala
208+
val WriteToSQLQuery = readStreamDf.writeStream.foreach(new ForeachWriter[Row] {
209+
var connection:java.sql.Connection = _
210+
var statement:java.sql.Statement = _
211+
212+
val jdbcUsername = "<SQL DB ADMIN USER>"
213+
val jdbcPassword = "<SQL DB ADMIN PWD>"
214+
val jdbcHostname = "<SQL SERVER NAME HOSTING SDL DB>" //typically, this is in the form or servername.database.windows.net
215+
val jdbcPort = 1433
216+
val jdbcDatabase ="<AZURE SQL DB NAME>"
217+
val driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
218+
val jdbc_url = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
219+
220+
def open(partitionId: Long, version: Long):Boolean = {
221+
Class.forName(driver)
222+
connection = DriverManager.getConnection(jdbc_url, jdbcUsername, jdbcPassword)
223+
statement = connection.createStatement
224+
true
225+
}
226+
227+
def process(value: Row): Unit = {
228+
val Date = value(0)
229+
val Time = value(1)
230+
val TargetTemp = value(2)
231+
val ActualTemp = value(3)
232+
val System = value(4)
233+
val SystemAge = value(5)
234+
val BuildingID = value(6)
235+
236+
val valueStr = "'" + Date + "'," + "'" + Time + "'," + "'" + TargetTemp + "'," + "'" + ActualTemp + "'," + "'" + System + "'," + "'" + SystemAge + "'," + "'" + BuildingID + "'"
237+
statement.execute("INSERT INTO " + "dbo.hvactable" + " VALUES (" + valueStr + ")")
238+
}
239+
240+
def close(errorOrNull: Throwable): Unit = {
241+
connection.close
242+
}
243+
})
244+
245+
var streamingQuery = WriteToSQLQuery.start()
246+
```
251247

252248
1. Verify that the data is being streamed into the `hvactable` by running the following query in SQL Server Management Studio (SSMS). Every time you run the query, it shows the number of rows in the table increasing.
253249

254-
```sql
255-
SELECT COUNT(*) FROM hvactable
256-
```
250+
```sql
251+
SELECT COUNT(*) FROM hvactable
252+
```
257253

258254
## Next steps
259255

Loading
36.1 KB
Loading
70.4 KB
Loading

0 commit comments

Comments
 (0)