You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: articles/hdinsight/kafka/apache-kafka-producer-consumer-api.md
+33-6Lines changed: 33 additions & 6 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -69,7 +69,7 @@ The important things to understand in the `pom.xml` file are:
69
69
70
70
### Producer.java
71
71
72
-
The producer communicates with the Kafka broker hosts (worker nodes) and sends data to a Kafka topic. The following code snippet is from the [Producer.java](https://github.com/Azure-Samples/hdinsight-kafka-java-get-started/blob/master/Producer-Consumer/src/main/java/com/microsoft/example/Producer.java) file from the [GitHub repository](https://github.com/Azure-Samples/hdinsight-kafka-java-get-started) and shows how to set the producer properties:
72
+
The producer communicates with the Kafka broker hosts (worker nodes) and sends data to a Kafka topic. The following code snippet is from the [Producer.java](https://github.com/Azure-Samples/hdinsight-kafka-java-get-started/blob/master/Producer-Consumer/src/main/java/com/microsoft/example/Producer.java) file from the [GitHub repository](https://github.com/Azure-Samples/hdinsight-kafka-java-get-started) and shows how to set the producer properties. For Enterprise Security Enabled clusters an additional property must be added "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"
73
73
74
74
```java
75
75
Properties properties = new Properties();
@@ -83,7 +83,7 @@ KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
83
83
84
84
### Consumer.java
85
85
86
-
The consumer communicates with the Kafka broker hosts (worker nodes), and reads records in a loop. The following code snippet from the [Consumer.java](https://github.com/Azure-Samples/hdinsight-kafka-java-get-started/blob/master/Producer-Consumer/src/main/java/com/microsoft/example/Consumer.java) file sets the consumer properties:
86
+
The consumer communicates with the Kafka broker hosts (worker nodes), and reads records in a loop. The following code snippet from the [Consumer.java](https://github.com/Azure-Samples/hdinsight-kafka-java-get-started/blob/master/Producer-Consumer/src/main/java/com/microsoft/example/Consumer.java) file sets the consumer properties. For Enterprise Security Enabled clusters an additional property must be added "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");"
87
87
88
88
```java
89
89
KafkaConsumer<String, String> consumer;
@@ -109,7 +109,17 @@ In this code, the consumer is configured to read from the start of the topic (`a
109
109
110
110
The [Run.java](https://github.com/Azure-Samples/hdinsight-kafka-java-get-started/blob/master/Producer-Consumer/src/main/java/com/microsoft/example/Run.java) file provides a command-line interface that runs either the producer or consumer code. You must provide the Kafka broker host information as a parameter. You can optionally include a group ID value, which is used by the consumer process. If you create multiple consumer instances using the same group ID, they'll load balance reading from the topic.
111
111
112
-
## Build and deploy the example
112
+
## Use Pre-built JAR files
113
+
114
+
Download the jars from the [Kafka Get Started Azure sample](https://github.com/Azure-Samples/hdinsight-kafka-java-get-started/tree/master/Prebuilt-Jars). If your cluster is **Enterprise Security Package (ESP)** enabled, use kafka-producer-consumer-esp.jar. Use the command below to copy the jars to your cluster.
If you would like to skip this step, prebuilt jars can be downloaded from the `Prebuilt-Jars` subdirectory. Download the kafka-producer-consumer.jar. If your cluster is **Enterprise Security Package (ESP)** enabled, use kafka-producer-consumer-esp.jar. Execute step 3 to copy the jar to your HDInsight cluster.
115
125
@@ -121,15 +131,16 @@ If you would like to skip this step, prebuilt jars can be downloaded from the `P
121
131
mvn clean package
122
132
```
123
133
124
-
This command creates a directory named `target`, that contains a file named `kafka-producer-consumer-1.0-SNAPSHOT.jar`.
134
+
This command creates a directory named `target`, that contains a file named `kafka-producer-consumer-1.0-SNAPSHOT.jar`. For ESP clusters the file will be `kafka-producer-consumer-esp-1.0-SNAPSHOT.jar`
125
135
126
136
3. Replace `sshuser` with the SSH user for your cluster, and replace `CLUSTERNAME` with the name of your cluster. Enter the following command to copy the `kafka-producer-consumer-1.0-SNAPSHOT.jar` file to your HDInsight cluster. When prompted enter the password for the SSH user.
This conversation was marked as resolved by anusricorp
133
144
134
145
1. Replace `sshuser` with the SSH user for your cluster, and replace `CLUSTERNAME` with the name of your cluster. Open an SSH connection to the cluster, by entering the following command. If prompted, enter the password for the SSH user account.
135
146
@@ -148,7 +159,6 @@ If you would like to skip this step, prebuilt jars can be downloaded from the `P
148
159
149
160
> [!Note]
150
161
> This command requires Ambari access. If your cluster is behind an NSG, run this command from a machine that can access Ambari.
151
-
152
162
1. Create Kafka topic, `myTest`, by entering the following command:
153
163
154
164
```bash
@@ -165,6 +175,7 @@ If you would like to skip this step, prebuilt jars can be downloaded from the `P
The records read, along with a count of records, is displayed.
@@ -200,6 +211,22 @@ Consumption by clients within the same group is handled through the partitions f
200
211
201
212
Records stored in Kafka are stored in the order they're received within a partition. To achieve in-ordered delivery for records *within a partition*, create a consumer group where the number of consumer instances matches the number of partitions. To achieve in-ordered delivery for records *within the topic*, create a consumer group with only one consumer instance.
202
213
214
+
## Common Issues faced
215
+
216
+
1. Topic creation fails
217
+
218
+
219
+
If your cluster is Enterprise Security Pack enabled, use the [pre-built JAR files for producer and consumer](https://github.com/Azure-Samples/hdinsight-kafka-java-get-started/blob/master/Prebuilt-Jars/kafka-producer-consumer-esp.jar).
220
+
221
+
222
+
The ESP jar can be built from from the code in the [`DomainJoined-Producer-Consumer` subdirectory](https://github.com/Azure-Samples/hdinsight-kafka-java-get-started/tree/master/DomainJoined-Producer-Consumer). Note that the producer and consumer properties ave an additional property `CommonClientConfigs.SECURITY_PROTOCOL_CONFIG` for ESP enabled clusters.
223
+
224
+
225
+
2. Facing issue with ESP enabled clusters
226
+
227
+
If produce and consume operations fail and you are using an ESP enabled cluster, check that the user `kafka` is present in all Ranger policies. If it is not present, add it to all Ranger policies.
228
+
229
+
203
230
## Clean up resources
204
231
205
232
To clean up the resources created by this tutorial, you can delete the resource group. Deleting the resource group also deletes the associated HDInsight cluster, and any other resources associated with the resource group.
0 commit comments