You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
description: Learn how to use Apache Spark streaming to get data into or out of Apache Kafka. In this tutorial, you stream data using a Jupyter notebook from Spark on HDInsight.
4
4
author: hrasheed-msft
5
+
ms.author: hrasheed
5
6
ms.reviewer: jasonh
6
-
7
7
ms.service: hdinsight
8
8
ms.custom: hdinsightactive,seodec18
9
9
ms.topic: tutorial
10
-
ms.date: 05/22/2019
11
-
ms.author: hrasheed
10
+
ms.date: 10/08/2019
11
+
12
12
#Customer intent: As a developer, I want to learn how to use Spark Structured Streaming with Kafka on HDInsight.
13
13
---
14
14
@@ -38,8 +38,8 @@ When you are done with the steps in this document, remember to delete the cluste
38
38
39
39
> [!IMPORTANT]
40
40
> The steps in this document require an Azure resource group that contains both a Spark on HDInsight and a Kafka on HDInsight cluster. These clusters are both located within an Azure Virtual Network, which allows the Spark cluster to directly communicate with the Kafka cluster.
41
-
>
42
-
> For your convenience, this document links to a template that can create all the required Azure resources.
41
+
>
42
+
> For your convenience, this document links to a template that can create all the required Azure resources.
43
43
>
44
44
> For more information on using HDInsight in a virtual network, see the [Plan a virtual network for HDInsight](hdinsight-plan-virtual-network-deployment.md) document.
45
45
@@ -91,7 +91,7 @@ In both snippets, data is read from Kafka and written to file. The differences b
91
91
|`write`|`writeStream`|
92
92
|`save`|`start`|
93
93
94
-
The streaming operation also uses `awaitTermination(30000)`, which stops the stream after 30,000 ms.
94
+
The streaming operation also uses `awaitTermination(30000)`, which stops the stream after 30,000 ms.
95
95
96
96
To use Structured Streaming with Kafka, your project must have a dependency on the `org.apache.spark : spark-sql-kafka-0-10_2.11` package. The version of this package should match the version of Spark on HDInsight. For Spark 2.2.0 (available in HDInsight 3.6), you can find the dependency information for different project types at [https://search.maven.org/#artifactdetails%7Corg.apache.spark%7Cspark-sql-kafka-0-10_2.11%7C2.2.0%7Cjar](https://search.maven.org/#artifactdetails%7Corg.apache.spark%7Cspark-sql-kafka-0-10_2.11%7C2.2.0%7Cjar).
97
97
@@ -109,7 +109,7 @@ For the Jupyter Notebook used with this tutorial, the following cell loads this
109
109
110
110
## Create the clusters
111
111
112
-
Apache Kafka on HDInsight does not provide access to the Kafka brokers over the public internet. Anything that uses Kafka must be in the same Azure virtual network. In this tutorial, both the Kafka and Spark clusters are located in the same Azure virtual network.
112
+
Apache Kafka on HDInsight does not provide access to the Kafka brokers over the public internet. Anything that uses Kafka must be in the same Azure virtual network. In this tutorial, both the Kafka and Spark clusters are located in the same Azure virtual network.
113
113
114
114
The following diagram shows how communication flows between Spark and Kafka:
115
115
@@ -148,12 +148,12 @@ To create an Azure Virtual Network, and then create the Kafka and Spark clusters
148
148
| Cluster Login Password | The admin user password for the clusters. |
149
149
| SSH User Name | The SSH user to create for the clusters. |
150
150
| SSH Password | The password for the SSH user. |
151
-
151
+
152
152

153
153
154
-
3. Read the **Terms and Conditions**, and then select **I agree to the terms and conditions stated above**
154
+
3. Read the **Terms and Conditions**, and then select **I agree to the terms and conditions stated above**.
155
155
156
-
4.Finally, check **Pin to dashboard** and then select **Purchase**.
156
+
4.Select **Purchase**.
157
157
158
158
> [!NOTE]
159
159
> It can take up to 20 minutes to create the clusters.
@@ -181,11 +181,11 @@ This example demonstrates how to use Spark Structured Streaming with Kafka on HD
181
181
182
182
3. Select **New > Spark** to create a notebook.
183
183
184
-
4. Load packages used by the Notebook by entering the following information in a Notebook cell. Run the command by using **CTRL + ENTER**.
184
+
4. Spark streaming has microbatching, which means data comes as batches and executers run on the batches of data. If the executor has idle timeout less than the time it takes to process the batch then the executors would be constantly added and removed. If the executors idle timeout is greater than the batch duration, the executor never gets removed. Hence **we recommend that you disable dynamic allocation by setting spark.dynamicAllocation.enabled to false when running streaming applications.**
185
185
186
-
Spark streaming has microbatching, which means data comes as batches and executers run on the batches of data. If the executor has idle timeout less than the time it takes to process the batch then the executors would be constantly added and removed. If the executors idle timeout is greater than the batch duration, the executor never gets removed. Hence **we recommend that you disable dynamic allocation by setting spark.dynamicAllocation.enabled to false when running streaming applications.**
186
+
Load packages used by the Notebook by entering the following information in a Notebook cell. Run the command by using **CTRL + ENTER**.
187
187
188
-
```
188
+
```configuration
189
189
%%configure -f
190
190
{
191
191
"conf": {
@@ -213,10 +213,10 @@ Spark streaming has microbatching, which means data comes as batches and execute
213
213
// Load the data from the New York City Taxi data REST API for 2016 Green Taxi Trip Data
214
214
val url="https://data.cityofnewyork.us/resource/pqfs-mqru.json"
215
215
val result = scala.io.Source.fromURL(url).mkString
216
-
216
+
217
217
// Create a dataframe from the JSON data
218
218
val taxiDF = spark.read.json(Seq(result).toDS)
219
-
219
+
220
220
// Display the dataframe containing trip data
221
221
taxiDF.show()
222
222
```
@@ -227,7 +227,7 @@ Spark streaming has microbatching, which means data comes as batches and execute
227
227
// The Kafka broker hosts and topic used to write to Kafka
228
228
val kafkaBrokers="YOUR_KAFKA_BROKER_HOSTS"
229
229
val kafkaTopic="tripdata"
230
-
230
+
231
231
println("Finished setting Kafka broker and topic configuration.")
232
232
```
233
233
@@ -247,7 +247,7 @@ Spark streaming has microbatching, which means data comes as batches and execute
@@ -272,7 +272,7 @@ Spark streaming has microbatching, which means data comes as batches and execute
272
272
// .add("trip_distance", StringType)
273
273
// .add("trip_type", StringType)
274
274
// .add("vendorid", StringType)
275
-
275
+
276
276
println("Schema declared")
277
277
```
278
278
@@ -281,10 +281,10 @@ Spark streaming has microbatching, which means data comes as batches and execute
281
281
```scala
282
282
// Read a batch from Kafka
283
283
val kafkaDF = spark.read.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("subscribe", kafkaTopic).option("startingOffsets", "earliest").load()
284
-
284
+
285
285
// Select data and write to file
286
286
val query = kafkaDF.select(from_json(col("value").cast("string"), schema) as "trip").write.format("parquet").option("path","/example/batchtripdata").option("checkpointLocation", "/batchcheckpoint").save()
287
-
287
+
288
288
println("Wrote data to file")
289
289
```
290
290
@@ -300,7 +300,7 @@ Spark streaming has microbatching, which means data comes as batches and execute
300
300
```scala
301
301
// Stream from Kafka
302
302
val kafkaStreamDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers", kafkaBrokers).option("subscribe", kafkaTopic).option("startingOffsets", "earliest").load()
303
-
303
+
304
304
// Select data from the stream and write to file
305
305
kafkaStreamDF.select(from_json(col("value").cast("string"), schema) as "trip").writeStream.format("parquet").option("path","/example/streamingtripdata").option("checkpointLocation", "/streamcheckpoint").start.awaitTermination(30000)
306
306
println("Wrote data to file")
@@ -325,7 +325,7 @@ To remove the resource group using the Azure portal:
325
325
326
326
> [!WARNING]
327
327
> HDInsight cluster billing starts once a cluster is created and stops when the cluster is deleted. Billing is pro-rated per minute, so you should always delete your cluster when it is no longer in use.
328
-
>
328
+
>
329
329
> Deleting a Kafka on HDInsight cluster deletes any data stored in Kafka.
0 commit comments