Skip to content

Commit 56f275d

Browse files
author
Sreekanth Iyer (Ushta Te Consultancy Services)
committed
Modify Flink IOT page
1 parent 71fbe4a commit 56f275d

File tree

1 file changed

+9
-188
lines changed

1 file changed

+9
-188
lines changed
Lines changed: 9 additions & 188 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
---
22
title: Process real-time IoT data on Apache Flink® with Azure HDInsight on AKS
3-
description: How to integrate Azure IoT Hub and Apache Flink®
3+
description: How to integrate Azure IoT Hub and Apache Flink®.
44
ms.service: hdinsight-aks
55
ms.topic: how-to
6-
ms.date: 10/03/2023
6+
ms.date: 04/04/2024
77
---
88

99
# Process real-time IoT data on Apache Flink® with Azure HDInsight on AKS
@@ -12,203 +12,24 @@ Azure IoT Hub is a managed service hosted in the cloud that acts as a central me
1212

1313
## Prerequisites
1414

15-
1. [Create an Azure IoTHub](/azure/iot-hub/iot-hub-create-through-portal/)
16-
2. [Create Flink cluster on HDInsight on AKS](./flink-create-cluster-portal.md)
15+
* [Create an Azure IoTHub](/azure/iot-hub/iot-hub-create-through-portal/)
16+
* [Create Flink cluster 1.17.0 on HDInsight on AKS](./flink-create-cluster-portal.md)
17+
* Use MSI to access ADLS Gen2
18+
* IntelliJ for development
1719

18-
## Configure Flink cluster
20+
> [!NOTE]
21+
> For this demonstration, we are using a Window VM as maven project develop env in the same VNET as HDInsight on AKS.
1922
20-
Add ABFS storage account keys in your Flink cluster's configuration.
21-
22-
Add the following configurations:
23-
24-
`fs.azure.account.key.<your storage account's dfs endpoint> = <your storage account's shared access key>`
2523

2624
:::image type="content" source="./media/azure-iot-hub/configuration-management.png" alt-text="Diagram showing search bar in Azure portal." lightbox="./media/azure-iot-hub/configuration-management.png":::
2725

28-
## Writing the Flink job
29-
30-
### Set up configuration for ABFS
31-
32-
```java
33-
Properties props = new Properties();
34-
props.put(
35-
"fs.azure.account.key.<your storage account's dfs endpoint>",
36-
"<your storage account's shared access key>"
37-
);
38-
39-
Configuration conf = ConfigurationUtils.createConfiguration(props);
40-
41-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
42-
43-
```
44-
45-
46-
This set up is required for Flink to authenticate with your ABFS storage account to write data to it.
47-
48-
### Defining the IoT Hub source
49-
50-
IoTHub is build on top of event hub and hence supports a kafka-like API. So in our Flink job, we can define a `KafkaSource` with appropriate parameters to consume messages from IoTHub.
51-
52-
```java
53-
String connectionString = "<your iot hub connection string>";
54-
55-
KafkaSource<String> source = KafkaSource.<String>builder()
56-
.setBootstrapServers("<your iot hub's service bus url>:9093")
57-
.setTopics("<name of your iot hub>")
58-
.setGroupId("$Default")
59-
.setProperty("partition.discovery.interval.ms", "10000")
60-
.setProperty("security.protocol", "SASL_SSL")
61-
.setProperty("sasl.mechanism", "PLAIN")
62-
.setProperty("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";", connectionString))
63-
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
64-
.setValueOnlyDeserializer(new SimpleStringSchema())
65-
.build();
66-
67-
DataStream<String> kafka = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
68-
kafka.print();
69-
```
70-
71-
The connection string for IoT Hub can be found here -
7226

7327
:::image type="content" source="./media/azure-iot-hub/built-in-endpoint.png" alt-text="Screenshot shows built-in endpoints." lightbox="./media/azure-iot-hub/built-in-endpoint.png":::
7428

75-
Within the connection string, you can find a service bus URL (URL of the underlying event hub namespace), which you need to add as a bootstrap server in your kafka source. In this case, it is: `iothub-ns-sagiri-iot-25146639-20dff4e426.servicebus.windows.net:9093`
76-
77-
### Defining the ABFS sink
78-
79-
```java
80-
String outputPath = "abfs://<container name>@<your storage account's dfs endpoint>";
81-
82-
final FileSink<String> sink = FileSink
83-
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
84-
.withRollingPolicy(
85-
DefaultRollingPolicy.builder()
86-
.withRolloverInterval(Duration.ofMinutes(2))
87-
.withInactivityInterval(Duration.ofMinutes(3))
88-
.withMaxPartSize(MemorySize.ofMebiBytes(5))
89-
.build())
90-
.build();
91-
92-
kafka.sinkTo(sink);
93-
```
94-
95-
### Flink job code
96-
97-
```java
98-
package org.example;
99-
100-
import java.time.Duration;
101-
import java.util.Properties;
102-
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
103-
import org.apache.flink.configuration.Configuration;
104-
import org.apache.flink.configuration.ConfigurationUtils;
105-
import org.apache.flink.configuration.MemorySize;
106-
import org.apache.flink.connector.file.sink.FileSink;
107-
import org.apache.flink.core.fs.Path;
108-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
109-
import org.apache.flink.streaming.api.datastream.DataStream;
110-
import org.apache.flink.api.common.serialization.SimpleStringSchema;
111-
import org.apache.flink.connector.kafka.source.KafkaSource;
112-
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
113-
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
114-
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
115-
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
116-
117-
public class StreamingJob {
118-
public static void main(String[] args) throws Throwable {
119-
120-
Properties props = new Properties();
121-
props.put(
122-
"fs.azure.account.key.<your storage account's dfs endpoint>",
123-
"<your storage account's shared access key>"
124-
);
125-
126-
Configuration conf = ConfigurationUtils.createConfiguration(props);
127-
128-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
129-
130-
String connectionString = "<your iot hub connection string>";
131-
132-
133-
KafkaSource<String> source = KafkaSource.<String>builder()
134-
.setBootstrapServers("<your iot hub's service bus url>:9093")
135-
.setTopics("<name of your iot hub>")
136-
.setGroupId("$Default")
137-
.setProperty("partition.discovery.interval.ms", "10000")
138-
.setProperty("security.protocol", "SASL_SSL")
139-
.setProperty("sasl.mechanism", "PLAIN")
140-
.setProperty("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";", connectionString))
141-
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
142-
.setValueOnlyDeserializer(new SimpleStringSchema())
143-
.build();
144-
145-
146-
DataStream<String> kafka = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
147-
kafka.print();
148-
149-
String outputPath = "abfs://<container name>@<your storage account's dfs endpoint>";
150-
151-
final FileSink<String> sink = FileSink
152-
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
153-
.withRollingPolicy(
154-
DefaultRollingPolicy.builder()
155-
.withRolloverInterval(Duration.ofMinutes(2))
156-
.withInactivityInterval(Duration.ofMinutes(3))
157-
.withMaxPartSize(MemorySize.ofMebiBytes(5))
158-
.build())
159-
.build();
160-
161-
kafka.sinkTo(sink);
162-
163-
env.execute("Azure-IoTHub-Flink-ABFS");
164-
}
165-
}
166-
167-
```
168-
169-
#### Maven dependencies
170-
171-
```xml
172-
<dependency>
173-
<groupId>org.apache.flink</groupId>
174-
<artifactId>flink-java</artifactId>
175-
<version>${flink.version}</version>
176-
</dependency>
177-
<dependency>
178-
<groupId>org.apache.flink</groupId>
179-
<artifactId>flink-streaming-java</artifactId>
180-
<version>${flink.version}</version>
181-
</dependency>
182-
<dependency>
183-
<groupId>org.apache.flink</groupId>
184-
<artifactId>flink-streaming-scala_2.12</artifactId>
185-
<version>${flink.version}</version>
186-
</dependency>
187-
<dependency>
188-
<groupId>org.apache.flink</groupId>
189-
<artifactId>flink-clients</artifactId>
190-
<version>${flink.version}</version>
191-
</dependency>
192-
<dependency>
193-
<groupId>org.apache.flink</groupId>
194-
<artifactId>flink-connector-kafka</artifactId>
195-
<version>${flink.version}</version>
196-
</dependency>
197-
<dependency>
198-
<groupId>org.apache.flink</groupId>
199-
<artifactId>flink-connector-files</artifactId>
200-
<version>${flink.version}</version>
201-
</dependency>
202-
```
203-
204-
205-
### Submit job
206-
207-
Submit job using HDInsight on AKS's [Flink job submission API](./flink-job-management.md)
20829

20930
:::image type="content" source="./media/azure-iot-hub/create-new-job.png" alt-text="Screenshot shows create a new job." lightbox="./media/azure-iot-hub/create-new-job.png":::
21031

21132
### Reference
21233

21334
- [Apache Flink Website](https://flink.apache.org/)
214-
- Apache, Apache Kafka, Kafka, Apache Flink, Flink, and associated open source project names are [trademarks](../trademarks.md) of the [Apache Software Foundation](https://www.apache.org/) (ASF).
35+
- Apache, Apache Kafka, Kafka, Apache Flink, Flink, and associated open source project names are [trademarks](../trademarks.md) of the [Apache Software Foundation](https://www.apache.org/) (ASF).

0 commit comments

Comments
 (0)