Skip to content

Commit accc1fd

Browse files
authored
Merge pull request #111988 from dagiro/freshness_c10
freshness_c10
2 parents a2e14d4 + 38c2b07 commit accc1fd

File tree

1 file changed

+115
-87
lines changed

1 file changed

+115
-87
lines changed
Lines changed: 115 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
---
22
title: Use Apache Spark to read and write data to Azure SQL Database
3-
description: Learn how to set up a connection between HDInsight Spark cluster and an Azure SQL Database to read data, write data, and stream data into a SQL database
3+
description: Learn how to set up a connection between HDInsight Spark cluster and an Azure SQL Database. To read data, write data, and stream data into a SQL database
44
author: hrasheed-msft
55
ms.author: hrasheed
66
ms.reviewer: jasonh
77
ms.service: hdinsight
88
ms.topic: conceptual
99
ms.custom: hdinsightactive
10-
ms.date: 03/05/2020
10+
ms.date: 04/20/2020
1111
---
1212

1313
# Use HDInsight Spark cluster to read and write data to Azure SQL Database
1414

15-
Learn how to connect an Apache Spark cluster in Azure HDInsight with an Azure SQL Database and then read, write, and stream data into the SQL database. The instructions in this article use a [Jupyter Notebook](https://jupyter.org/) to run the Scala code snippets. However, you can create a standalone application in Scala or Python and perform the same tasks.
15+
Learn how to connect an Apache Spark cluster in Azure HDInsight with an Azure SQL Database. Then read, write, and stream data into the SQL database. The instructions in this article use a Jupyter Notebook to run the Scala code snippets. However, you can create a standalone application in Scala or Python and do the same tasks.
1616

1717
## Prerequisites
1818

@@ -28,7 +28,7 @@ Learn how to connect an Apache Spark cluster in Azure HDInsight with an Azure SQ
2828

2929
## Create a Jupyter Notebook
3030

31-
Start by creating a [Jupyter Notebook](https://jupyter.org/) associated with the Spark cluster. You use this notebook to run the code snippets used in this article.
31+
Start by creating a Jupyter Notebook associated with the Spark cluster. You use this notebook to run the code snippets used in this article.
3232

3333
1. From the [Azure portal](https://portal.azure.com/), open your cluster.
3434
1. Select **Jupyter notebook** underneath **Cluster dashboards** on the right side. If you don't see **Cluster dashboards**, select **Overview** from the left menu. If prompted, enter the admin credentials for the cluster.
@@ -49,7 +49,7 @@ Start by creating a [Jupyter Notebook](https://jupyter.org/) associated with the
4949
> [!NOTE]
5050
> In this article, we use a Spark (Scala) kernel because streaming data from Spark into SQL database is only supported in Scala and Java currently. Even though reading from and writing into SQL can be done using Python, for consistency in this article, we use Scala for all three operations.
5151
52-
1. This opens a new notebook with a default name, **Untitled**. Click the notebook name and enter a name of your choice.
52+
1. A new notebook opens with a default name, **Untitled**. Click the notebook name and enter a name of your choice.
5353

5454
![Provide a name for the notebook](./media/apache-spark-connect-to-sql-database/hdinsight-spark-jupyter-notebook-name.png "Provide a name for the notebook")
5555

@@ -61,83 +61,105 @@ In this section, you read data from a table (for example, **SalesLT.Address**) t
6161

6262
1. In a new Jupyter notebook, in a code cell, paste the following snippet and replace the placeholder values with the values for your Azure SQL Database.
6363

64-
// Declare the values for your Azure SQL database
64+
```scala
65+
// Declare the values for your Azure SQL database
6566

66-
val jdbcUsername = "<SQL DB ADMIN USER>"
67-
val jdbcPassword = "<SQL DB ADMIN PWD>"
68-
val jdbcHostname = "<SQL SERVER NAME HOSTING SDL DB>" //typically, this is in the form or servername.database.windows.net
69-
val jdbcPort = 1433
70-
val jdbcDatabase ="<AZURE SQL DB NAME>"
67+
val jdbcUsername = "<SQL DB ADMIN USER>"
68+
val jdbcPassword = "<SQL DB ADMIN PWD>"
69+
val jdbcHostname = "<SQL SERVER NAME HOSTING SDL DB>" //typically, this is in the form or servername.database.windows.net
70+
val jdbcPort = 1433
71+
val jdbcDatabase ="<AZURE SQL DB NAME>"
72+
```
7173

7274
Press **SHIFT + ENTER** to run the code cell.
7375

7476
1. Use the snippet below to build a JDBC URL that you can pass to the Spark dataframe APIs. The code creates a `Properties` object to hold the parameters. Paste the snippet in a code cell and press **SHIFT + ENTER** to run.
7577

76-
import java.util.Properties
78+
```scala
79+
import java.util.Properties
7780

78-
val jdbc_url = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=60;"
79-
val connectionProperties = new Properties()
80-
connectionProperties.put("user", s"${jdbcUsername}")
81-
connectionProperties.put("password", s"${jdbcPassword}")
81+
val jdbc_url = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=60;"
82+
val connectionProperties = new Properties()
83+
connectionProperties.put("user", s"${jdbcUsername}")
84+
connectionProperties.put("password", s"${jdbcPassword}")
85+
```
8286

8387
1. Use the snippet below to create a dataframe with the data from a table in your Azure SQL Database. In this snippet, we use a `SalesLT.Address` table that is available as part of the **AdventureWorksLT** database. Paste the snippet in a code cell and press **SHIFT + ENTER** to run.
8488

85-
val sqlTableDF = spark.read.jdbc(jdbc_url, "SalesLT.Address", connectionProperties)
89+
```scala
90+
val sqlTableDF = spark.read.jdbc(jdbc_url, "SalesLT.Address", connectionProperties)
91+
```
8692

87-
1. You can now perform operations on the dataframe, such as getting the data schema:
93+
1. You can now do operations on the dataframe, such as getting the data schema:
8894

89-
sqlTableDF.printSchema
95+
```scala
96+
sqlTableDF.printSchema
97+
```
9098

91-
You see an output similar to the following:
99+
You see an output similar to the following image:
92100

93101
![schema output](./media/apache-spark-connect-to-sql-database/read-from-sql-schema-output.png "schema output")
94102

95-
1. You can also perform operations like, retrieve the top 10 rows.
103+
1. You can also do operations like, retrieve the top 10 rows.
96104

97-
sqlTableDF.show(10)
105+
```scala
106+
sqlTableDF.show(10)
107+
```
98108

99109
1. Or, retrieve specific columns from the dataset.
100110

101-
sqlTableDF.select("AddressLine1", "City").show(10)
111+
```scala
112+
sqlTableDF.select("AddressLine1", "City").show(10)
113+
```
102114

103115
## Write data into Azure SQL Database
104116

105117
In this section, we use a sample CSV file available on the cluster to create a table in Azure SQL Database and populate it with data. The sample CSV file (**HVAC.csv**) is available on all HDInsight clusters at `HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv`.
106118

107119
1. In a new Jupyter notebook, in a code cell, paste the following snippet and replace the placeholder values with the values for your Azure SQL Database.
108120

109-
// Declare the values for your Azure SQL database
121+
```scala
122+
// Declare the values for your Azure SQL database
110123

111-
val jdbcUsername = "<SQL DB ADMIN USER>"
112-
val jdbcPassword = "<SQL DB ADMIN PWD>"
113-
val jdbcHostname = "<SQL SERVER NAME HOSTING SDL DB>" //typically, this is in the form or servername.database.windows.net
114-
val jdbcPort = 1433
115-
val jdbcDatabase ="<AZURE SQL DB NAME>"
124+
val jdbcUsername = "<SQL DB ADMIN USER>"
125+
val jdbcPassword = "<SQL DB ADMIN PWD>"
126+
val jdbcHostname = "<SQL SERVER NAME HOSTING SDL DB>" //typically, this is in the form or servername.database.windows.net
127+
val jdbcPort = 1433
128+
val jdbcDatabase ="<AZURE SQL DB NAME>"
129+
```
116130

117131
Press **SHIFT + ENTER** to run the code cell.
118132

119133
1. The following snippet builds a JDBC URL that you can pass to the Spark dataframe APIs. The code creates a `Properties` object to hold the parameters. Paste the snippet in a code cell and press **SHIFT + ENTER** to run.
120134

121-
import java.util.Properties
135+
```scala
136+
import java.util.Properties
122137

123-
val jdbc_url = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=60;"
124-
val connectionProperties = new Properties()
125-
connectionProperties.put("user", s"${jdbcUsername}")
126-
connectionProperties.put("password", s"${jdbcPassword}")
138+
val jdbc_url = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=60;"
139+
val connectionProperties = new Properties()
140+
connectionProperties.put("user", s"${jdbcUsername}")
141+
connectionProperties.put("password", s"${jdbcPassword}")
142+
```
127143

128144
1. Use the following snippet to extract the schema of the data in HVAC.csv and use the schema to load the data from the CSV in a dataframe, `readDf`. Paste the snippet in a code cell and press **SHIFT + ENTER** to run.
129145

130-
val userSchema = spark.read.option("header", "true").csv("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv").schema
131-
val readDf = spark.read.format("csv").schema(userSchema).load("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")
146+
```scala
147+
val userSchema = spark.read.option("header", "true").csv("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv").schema
148+
val readDf = spark.read.format("csv").schema(userSchema).load("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")
149+
```
132150

133151
1. Use the `readDf` dataframe to create a temporary table, `temphvactable`. Then use the temporary table to create a hive table, `hvactable_hive`.
134152

135-
readDf.createOrReplaceTempView("temphvactable")
136-
spark.sql("create table hvactable_hive as select * from temphvactable")
153+
```scala
154+
readDf.createOrReplaceTempView("temphvactable")
155+
spark.sql("create table hvactable_hive as select * from temphvactable")
156+
```
137157

138158
1. Finally, use the hive table to create a table in Azure SQL Database. The following snippet creates `hvactable` in Azure SQL Database.
139159

140-
spark.table("hvactable_hive").write.jdbc(jdbc_url, "hvactable", connectionProperties)
160+
```scala
161+
spark.table("hvactable_hive").write.jdbc(jdbc_url, "hvactable", connectionProperties)
162+
```
141163

142164
1. Connect to the Azure SQL Database using SSMS and verify that you see a `dbo.hvactable` there.
143165

@@ -167,62 +189,68 @@ In this section, we stream data into the `hvactable` that you already created in
167189

168190
1. Create a new Jupyter notebook on the HDInsight Spark cluster. In a code cell, paste the following snippet and then press **SHIFT + ENTER**:
169191

170-
import org.apache.spark.sql._
171-
import org.apache.spark.sql.types._
172-
import org.apache.spark.sql.functions._
173-
import org.apache.spark.sql.streaming._
174-
import java.sql.{Connection,DriverManager,ResultSet}
192+
```scala
193+
import org.apache.spark.sql._
194+
import org.apache.spark.sql.types._
195+
import org.apache.spark.sql.functions._
196+
import org.apache.spark.sql.streaming._
197+
import java.sql.{Connection,DriverManager,ResultSet}
198+
```
175199

176200
1. We stream data from the **HVAC.csv** into the `hvactable`. HVAC.csv file is available on the cluster at `/HdiSamples/HdiSamples/SensorSampleData/HVAC/`. In the following snippet, we first get the schema of the data to be streamed. Then, we create a streaming dataframe using that schema. Paste the snippet in a code cell and press **SHIFT + ENTER** to run.
177201

178-
val userSchema = spark.read.option("header", "true").csv("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv").schema
179-
val readStreamDf = spark.readStream.schema(userSchema).csv("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/")
180-
readStreamDf.printSchema
202+
```scala
203+
val userSchema = spark.read.option("header", "true").csv("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv").schema
204+
val readStreamDf = spark.readStream.schema(userSchema).csv("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/")
205+
readStreamDf.printSchema
206+
```
181207

182208
1. The output shows the schema of **HVAC.csv**. The `hvactable` has the same schema as well. The output lists the columns in the table.
183209

184-
![hdinsight Apache Spark schema table](./media/apache-spark-connect-to-sql-database/hdinsight-schema-table.png "Schema of table")
210+
![`hdinsight Apache Spark schema table`](./media/apache-spark-connect-to-sql-database/hdinsight-schema-table.png "Schema of table")
185211

186212
1. Finally, use the following snippet to read data from the HVAC.csv and stream it into the `hvactable` in Azure SQL Database. Paste the snippet in a code cell, replace the placeholder values with the values for your Azure SQL Database, and then press **SHIFT + ENTER** to run.
187213

188-
val WriteToSQLQuery = readStreamDf.writeStream.foreach(new ForeachWriter[Row] {
189-
var connection:java.sql.Connection = _
190-
var statement:java.sql.Statement = _
191-
192-
val jdbcUsername = "<SQL DB ADMIN USER>"
193-
val jdbcPassword = "<SQL DB ADMIN PWD>"
194-
val jdbcHostname = "<SQL SERVER NAME HOSTING SDL DB>" //typically, this is in the form or servername.database.windows.net
195-
val jdbcPort = 1433
196-
val jdbcDatabase ="<AZURE SQL DB NAME>"
197-
val driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
198-
val jdbc_url = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
199-
200-
def open(partitionId: Long, version: Long):Boolean = {
201-
Class.forName(driver)
202-
connection = DriverManager.getConnection(jdbc_url, jdbcUsername, jdbcPassword)
203-
statement = connection.createStatement
204-
true
205-
}
206-
207-
def process(value: Row): Unit = {
208-
val Date = value(0)
209-
val Time = value(1)
210-
val TargetTemp = value(2)
211-
val ActualTemp = value(3)
212-
val System = value(4)
213-
val SystemAge = value(5)
214-
val BuildingID = value(6)
215-
216-
val valueStr = "'" + Date + "'," + "'" + Time + "'," + "'" + TargetTemp + "'," + "'" + ActualTemp + "'," + "'" + System + "'," + "'" + SystemAge + "'," + "'" + BuildingID + "'"
217-
statement.execute("INSERT INTO " + "dbo.hvactable" + " VALUES (" + valueStr + ")")
218-
}
219-
220-
def close(errorOrNull: Throwable): Unit = {
221-
connection.close
222-
}
223-
})
224-
225-
var streamingQuery = WriteToSQLQuery.start()
214+
```scala
215+
val WriteToSQLQuery = readStreamDf.writeStream.foreach(new ForeachWriter[Row] {
216+
var connection:java.sql.Connection = _
217+
var statement:java.sql.Statement = _
218+
219+
val jdbcUsername = "<SQL DB ADMIN USER>"
220+
val jdbcPassword = "<SQL DB ADMIN PWD>"
221+
val jdbcHostname = "<SQL SERVER NAME HOSTING SDL DB>" //typically, this is in the form or servername.database.windows.net
222+
val jdbcPort = 1433
223+
val jdbcDatabase ="<AZURE SQL DB NAME>"
224+
val driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
225+
val jdbc_url = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
226+
227+
def open(partitionId: Long, version: Long):Boolean = {
228+
Class.forName(driver)
229+
connection = DriverManager.getConnection(jdbc_url, jdbcUsername, jdbcPassword)
230+
statement = connection.createStatement
231+
true
232+
}
233+
234+
def process(value: Row): Unit = {
235+
val Date = value(0)
236+
val Time = value(1)
237+
val TargetTemp = value(2)
238+
val ActualTemp = value(3)
239+
val System = value(4)
240+
val SystemAge = value(5)
241+
val BuildingID = value(6)
242+
243+
val valueStr = "'" + Date + "'," + "'" + Time + "'," + "'" + TargetTemp + "'," + "'" + ActualTemp + "'," + "'" + System + "'," + "'" + SystemAge + "'," + "'" + BuildingID + "'"
244+
statement.execute("INSERT INTO " + "dbo.hvactable" + " VALUES (" + valueStr + ")")
245+
}
246+
247+
def close(errorOrNull: Throwable): Unit = {
248+
connection.close
249+
}
250+
})
251+
252+
var streamingQuery = WriteToSQLQuery.start()
253+
```
226254

227255
1. Verify that the data is being streamed into the `hvactable` by running the following query in SQL Server Management Studio (SSMS). Every time you run the query, it shows the number of rows in the table increasing.
228256

@@ -233,5 +261,5 @@ In this section, we stream data into the `hvactable` that you already created in
233261
## Next steps
234262

235263
* [Use HDInsight Spark cluster to analyze data in Data Lake Storage](apache-spark-use-with-data-lake-store.md)
236-
* [Process structured streaming events using EventHub](apache-spark-eventhub-structured-streaming.md)
264+
* [Load data and run queries on an Apache Spark cluster in Azure HDInsight](apache-spark-load-data-run-query.md)
237265
* [Use Apache Spark Structured Streaming with Apache Kafka on HDInsight](../hdinsight-apache-kafka-spark-structured-streaming.md)

0 commit comments

Comments
 (0)