Skip to content

Commit 6ee0012

Browse files
authored
Merge pull request #107289 from dagiro/freshness24
freshness24
2 parents 6b5242b + 738148d commit 6ee0012

File tree

1 file changed

+24
-26
lines changed

1 file changed

+24
-26
lines changed

articles/hdinsight/hdinsight-apache-kafka-spark-structured-streaming.md

Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,16 @@ author: hrasheed-msft
55
ms.author: hrasheed
66
ms.reviewer: jasonh
77
ms.service: hdinsight
8-
ms.custom: hdinsightactive,seodec18
98
ms.topic: tutorial
10-
ms.date: 10/08/2019
9+
ms.custom: hdinsightactive,seodec18
10+
ms.date: 03/11/2020
1111

1212
#Customer intent: As a developer, I want to learn how to use Spark Structured Streaming with Kafka on HDInsight.
1313
---
1414

1515
# Tutorial: Use Apache Spark Structured Streaming with Apache Kafka on HDInsight
1616

17-
This tutorial demonstrates how to use [Apache Spark Structured Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide) to read and write data with [Apache Kafka](https://kafka.apache.org/) on Azure HDInsight.
17+
This tutorial demonstrates how to use [Apache Spark Structured Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide) to read and write data with [Apache Kafka](./kafka/apache-kafka-introduction.md) on Azure HDInsight.
1818

1919
Spark Structured Streaming is a stream processing engine built on Spark SQL. It allows you to express streaming computations the same as batch computation on static data.
2020

@@ -24,7 +24,7 @@ In this tutorial, you learn how to:
2424
> * Use an Azure Resource Manager template to create clusters
2525
> * Use Spark Structured Streaming with Kafka
2626
27-
When you are done with the steps in this document, remember to delete the clusters to avoid excess charges.
27+
When you're done with the steps in this document, remember to delete the clusters to avoid excess charges.
2828

2929
## Prerequisites
3030

@@ -109,7 +109,7 @@ For the Jupyter Notebook used with this tutorial, the following cell loads this
109109

110110
## Create the clusters
111111

112-
Apache Kafka on HDInsight does not provide access to the Kafka brokers over the public internet. Anything that uses Kafka must be in the same Azure virtual network. In this tutorial, both the Kafka and Spark clusters are located in the same Azure virtual network.
112+
Apache Kafka on HDInsight doesn't provide access to the Kafka brokers over the public internet. Anything that uses Kafka must be in the same Azure virtual network. In this tutorial, both the Kafka and Spark clusters are located in the same Azure virtual network.
113113

114114
The following diagram shows how communication flows between Spark and Kafka:
115115

@@ -151,7 +151,7 @@ To create an Azure Virtual Network, and then create the Kafka and Spark clusters
151151

152152
![Screenshot of the customized template](./media/hdinsight-apache-kafka-spark-structured-streaming/spark-kafka-template.png)
153153

154-
3. Read the **Terms and Conditions**, and then select **I agree to the terms and conditions stated above**.
154+
3. Read the **Terms and Conditions**, then select **I agree to the terms and conditions stated above**.
155155

156156
4. Select **Purchase**.
157157

@@ -165,23 +165,21 @@ This example demonstrates how to use Spark Structured Streaming with Kafka on HD
165165
1. Gather host information. Use the curl and [jq](https://stedolan.github.io/jq/) commands below to obtain your Kafka ZooKeeper and broker hosts information. The commands are designed for a Windows command prompt, slight variations will be needed for other environments. Replace `KafkaCluster` with the name of your Kafka cluster, and `KafkaPassword` with the cluster login password. Also, replace `C:\HDI\jq-win64.exe` with the actual path to your jq installation. Enter the commands in a Windows command prompt and save the output for use in later steps.
166166

167167
```cmd
168+
REM Enter cluster name in lowercase
169+
168170
set CLUSTERNAME=KafkaCluster
169171
set PASSWORD=KafkaPassword
170-
172+
171173
curl -u admin:%PASSWORD% -G "https://%CLUSTERNAME%.azurehdinsight.net/api/v1/clusters/%CLUSTERNAME%/services/ZOOKEEPER/components/ZOOKEEPER_SERVER" | C:\HDI\jq-win64.exe -r "["""\(.host_components[].HostRoles.host_name):2181"""] | join(""",""")"
172-
174+
173175
curl -u admin:%PASSWORD% -G "https://%CLUSTERNAME%.azurehdinsight.net/api/v1/clusters/%CLUSTERNAME%/services/KAFKA/components/KAFKA_BROKER" | C:\HDI\jq-win64.exe -r "["""\(.host_components[].HostRoles.host_name):9092"""] | join(""",""")"
174176
```
175177
176-
2. In your web browser, connect to the Jupyter notebook on your Spark cluster. In the following URL, replace `CLUSTERNAME` with the name of your __Spark__ cluster:
177-
178-
https://CLUSTERNAME.azurehdinsight.net/jupyter
179-
180-
When prompted, enter the cluster login (admin) and password used when you created the cluster.
178+
1. From a web browser, navigate to `https://CLUSTERNAME.azurehdinsight.net/jupyter`, where `CLUSTERNAME` is the name of your cluster. When prompted, enter the cluster login (admin) and password used when you created the cluster.
181179
182-
3. Select **New > Spark** to create a notebook.
180+
1. Select **New > Spark** to create a notebook.
183181
184-
4. Spark streaming has microbatching, which means data comes as batches and executers run on the batches of data. If the executor has idle timeout less than the time it takes to process the batch then the executors would be constantly added and removed. If the executors idle timeout is greater than the batch duration, the executor never gets removed. Hence **we recommend that you disable dynamic allocation by setting spark.dynamicAllocation.enabled to false when running streaming applications.**
182+
1. Spark streaming has microbatching, which means data comes as batches and executers run on the batches of data. If the executor has idle timeout less than the time it takes to process the batch, then the executors would be constantly added and removed. If the executors idle timeout is greater than the batch duration, the executor never gets removed. Hence **we recommend that you disable dynamic allocation by setting spark.dynamicAllocation.enabled to false when running streaming applications.**
185183
186184
Load packages used by the Notebook by entering the following information in a Notebook cell. Run the command by using **CTRL + ENTER**.
187185
@@ -196,7 +194,7 @@ This example demonstrates how to use Spark Structured Streaming with Kafka on HD
196194
}
197195
```
198196
199-
5. Create the Kafka topic. Edit the command below by replacing `YOUR_ZOOKEEPER_HOSTS` with the Zookeeper host information extracted in the first step. Enter the edited command in your Jupyter Notebook to create the `tripdata` topic.
197+
1. Create the Kafka topic. Edit the command below by replacing `YOUR_ZOOKEEPER_HOSTS` with the Zookeeper host information extracted in the first step. Enter the edited command in your Jupyter Notebook to create the `tripdata` topic.
200198
201199
```scala
202200
%%bash
@@ -205,7 +203,7 @@ This example demonstrates how to use Spark Structured Streaming with Kafka on HD
205203
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic tripdata --zookeeper $KafkaZookeepers
206204
```
207205
208-
6. Retrieve data on taxi trips. Enter the command in the next cell to load data on taxi trips in New York City. The data is loaded into a dataframe and then the dataframe is displayed as the cell output.
206+
1. Retrieve data on taxi trips. Enter the command in the next cell to load data on taxi trips in New York City. The data is loaded into a dataframe and then the dataframe is displayed as the cell output.
209207
210208
```scala
211209
import spark.implicits._
@@ -221,7 +219,7 @@ This example demonstrates how to use Spark Structured Streaming with Kafka on HD
221219
taxiDF.show()
222220
```
223221
224-
7. Set the Kafka broker hosts information. Replace `YOUR_KAFKA_BROKER_HOSTS` with the broker hosts information you extracted in step 1. Enter the edited command in the next Jupyter Notebook cell.
222+
1. Set the Kafka broker hosts information. Replace `YOUR_KAFKA_BROKER_HOSTS` with the broker hosts information you extracted in step 1. Enter the edited command in the next Jupyter Notebook cell.
225223
226224
```scala
227225
// The Kafka broker hosts and topic used to write to Kafka
@@ -231,7 +229,7 @@ This example demonstrates how to use Spark Structured Streaming with Kafka on HD
231229
println("Finished setting Kafka broker and topic configuration.")
232230
```
233231
234-
8. Send the data to Kafka. In the following command, the `vendorid` field is used as the key value for the Kafka message. The key is used by Kafka when partitioning data. All of the fields are stored in the Kafka message as a JSON string value. Enter the following command in Jupyter to save the data to Kafka using a batch query.
232+
1. Send the data to Kafka. In the following command, the `vendorid` field is used as the key value for the Kafka message. The key is used by Kafka when partitioning data. All of the fields are stored in the Kafka message as a JSON string value. Enter the following command in Jupyter to save the data to Kafka using a batch query.
235233
236234
```scala
237235
// Select the vendorid as the key and save the JSON string as the value.
@@ -240,7 +238,7 @@ This example demonstrates how to use Spark Structured Streaming with Kafka on HD
240238
println("Data sent to Kafka")
241239
```
242240
243-
9. Declare a schema. The following command demonstrates how to use a schema when reading JSON data from kafka. Enter the command in your next Jupyter cell.
241+
1. Declare a schema. The following command demonstrates how to use a schema when reading JSON data from kafka. Enter the command in your next Jupyter cell.
244242
245243
```scala
246244
// Import bits useed for declaring schemas and working with JSON data
@@ -276,7 +274,7 @@ This example demonstrates how to use Spark Structured Streaming with Kafka on HD
276274
println("Schema declared")
277275
```
278276
279-
10. Select data and start the stream. The following command demonstrates how to retrieve data from kafka using a batch query, and then write the results out to HDFS on the Spark cluster. In this example, the `select` retrieves the message (value field) from Kafka and applies the schema to it. The data is then written to HDFS (WASB or ADL) in parquet format. Enter the command in your next Jupyter cell.
277+
1. Select data and start the stream. The following command demonstrates how to retrieve data from kafka using a batch query, and then write the results out to HDFS on the Spark cluster. In this example, the `select` retrieves the message (value field) from Kafka and applies the schema to it. The data is then written to HDFS (WASB or ADL) in parquet format. Enter the command in your next Jupyter cell.
280278
281279
```scala
282280
// Read a batch from Kafka
@@ -288,14 +286,14 @@ This example demonstrates how to use Spark Structured Streaming with Kafka on HD
288286
println("Wrote data to file")
289287
```
290288
291-
11. You can verify that the files were created by entering the command in your next Jupyter cell. It lists the files in the `/example/batchtripdata` directory.
289+
1. You can verify that the files were created by entering the command in your next Jupyter cell. It lists the files in the `/example/batchtripdata` directory.
292290
293291
```scala
294292
%%bash
295293
hdfs dfs -ls /example/batchtripdata
296294
```
297295
298-
12. While the previous example used a batch query, the following command demonstrates how to do the same thing using a streaming query. Enter the command in your next Jupyter cell.
296+
1. While the previous example used a batch query, the following command demonstrates how to do the same thing using a streaming query. Enter the command in your next Jupyter cell.
299297
300298
```scala
301299
// Stream from Kafka
@@ -306,7 +304,7 @@ This example demonstrates how to use Spark Structured Streaming with Kafka on HD
306304
println("Wrote data to file")
307305
```
308306
309-
13. Run the following cell to verify that the files were written by the streaming query.
307+
1. Run the following cell to verify that the files were written by the streaming query.
310308
311309
```scala
312310
%%bash
@@ -319,7 +317,7 @@ To clean up the resources created by this tutorial, you can delete the resource
319317
320318
To remove the resource group using the Azure portal:
321319
322-
1. In the Azure portal, expand the menu on the left side to open the menu of services, and then choose __Resource Groups__ to display the list of your resource groups.
320+
1. In the [Azure portal](https://portal.azure.com/), expand the menu on the left side to open the menu of services, and then choose __Resource Groups__ to display the list of your resource groups.
323321
2. Locate the resource group to delete, and then right-click the __More__ button (...) on the right side of the listing.
324322
3. Select __Delete resource group__, and then confirm.
325323
@@ -330,7 +328,7 @@ To remove the resource group using the Azure portal:
330328
331329
## Next steps
332330
333-
In this tutorial, you learned how to use [Apache Spark Structured Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) to write and read data from [Apache Kafka](https://kafka.apache.org/) on HDInsight. Use the following link to learn how to use [Apache Storm](https://storm.apache.org/) with Kafka.
331+
In this tutorial, you learned how to use [Apache Spark Structured Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) to write and read data from [Apache Kafka](./kafka/apache-kafka-introduction.md) on HDInsight. Use the following link to learn how to use [Apache Storm](./storm/apache-storm-overview.md) with Kafka.
334332
335333
> [!div class="nextstepaction"]
336334
> [Use Apache Storm with Apache Kafka](hdinsight-apache-storm-with-kafka.md)

0 commit comments

Comments
 (0)