Skip to content

Commit 0178402

Browse files
authored
Merge pull request #271177 from sreekzz/Kafka-IOT
Modify Flink IOT page
2 parents a778936 + 2e4c7d1 commit 0178402

File tree

7 files changed

+144
-152
lines changed

7 files changed

+144
-152
lines changed
Lines changed: 144 additions & 152 deletions
Original file line numberDiff line numberDiff line change
@@ -1,214 +1,206 @@
11
---
22
title: Process real-time IoT data on Apache Flink® with Azure HDInsight on AKS
3-
description: How to integrate Azure IoT Hub and Apache Flink®
3+
description: How to integrate Azure IoT Hub and Apache Flink®.
44
ms.service: hdinsight-aks
55
ms.topic: how-to
6-
ms.date: 10/03/2023
6+
ms.date: 04/04/2024
77
---
88

99
# Process real-time IoT data on Apache Flink® with Azure HDInsight on AKS
1010

1111
Azure IoT Hub is a managed service hosted in the cloud that acts as a central message hub for communication between an IoT application and its attached devices. You can connect millions of devices and their backend solutions reliably and securely. Almost any device can be connected to an IoT hub.
1212

13-
## Prerequisites
14-
15-
1. [Create an Azure IoTHub](/azure/iot-hub/iot-hub-create-through-portal/)
16-
2. [Create Flink cluster on HDInsight on AKS](./flink-create-cluster-portal.md)
13+
In this example, the code processes real-time IoT data on Apache Flink® with Azure HDInsight on AKS and sinks to ADLS gen2 storage.
1714

18-
## Configure Flink cluster
15+
## Prerequisites
1916

20-
Add ABFS storage account keys in your Flink cluster's configuration.
17+
* [Create an Azure IoTHub](/azure/iot-hub/iot-hub-create-through-portal/)
18+
* [Create Flink cluster 1.17.0 on HDInsight on AKS](./flink-create-cluster-portal.md)
19+
* Use MSI to access ADLS Gen2
20+
* IntelliJ for development
2121

22-
Add the following configurations:
22+
> [!NOTE]
23+
> For this demonstration, we are using a Window VM as maven project develop env in the same VNET as HDInsight on AKS.
2324
24-
`fs.azure.account.key.<your storage account's dfs endpoint> = <your storage account's shared access key>`
25+
## Flink cluster 1.17.0 on HDInsight on AKS
2526

2627
:::image type="content" source="./media/azure-iot-hub/configuration-management.png" alt-text="Diagram showing search bar in Azure portal." lightbox="./media/azure-iot-hub/configuration-management.png":::
2728

28-
## Writing the Flink job
29-
30-
### Set up configuration for ABFS
31-
32-
```java
33-
Properties props = new Properties();
34-
props.put(
35-
"fs.azure.account.key.<your storage account's dfs endpoint>",
36-
"<your storage account's shared access key>"
37-
);
38-
39-
Configuration conf = ConfigurationUtils.createConfiguration(props);
29+
## Azure IOT Hub on Azure portal
4030

41-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
31+
Within the connection string, you can find a service bus URL (URL of the underlying event hub namespace), which you need to add as a bootstrap server in your Kafka source. In this example, it's `iothub-ns-contosoiot-55642726-4642a54853.servicebus.windows.net:9093`.
4232

43-
```
33+
:::image type="content" source="./media/azure-iot-hub/built-in-endpoint.png" alt-text="Screenshot shows built-in endpoints." lightbox="./media/azure-iot-hub/built-in-endpoint.png":::
4434

35+
## Prepare message into Azure IOT device
4536

46-
This set up is required for Flink to authenticate with your ABFS storage account to write data to it.
37+
Each IoT hub comes with built-in system endpoints to handle system and device messages.
4738

48-
### Defining the IoT Hub source
39+
For more information, see [How to use VS Code as IoT Hub Device Simulator](https://devblogs.microsoft.com/iotdev/use-vs-code-as-iot-hub-device-simulator-say-hello-to-azure-iot-hub-in-5-minutes/).
4940

50-
IoTHub is build on top of event hub and hence supports a kafka-like API. So in our Flink job, we can define a `KafkaSource` with appropriate parameters to consume messages from IoTHub.
5141

52-
```java
53-
String connectionString = "<your iot hub connection string>";
42+
:::image type="content" source="./media/azure-iot-hub/send-messages.png" alt-text="Screenshot shows how to send messages." lightbox="./media/azure-iot-hub/send-messages.png":::
5443

55-
KafkaSource<String> source = KafkaSource.<String>builder()
56-
.setBootstrapServers("<your iot hub's service bus url>:9093")
57-
.setTopics("<name of your iot hub>")
58-
.setGroupId("$Default")
59-
.setProperty("partition.discovery.interval.ms", "10000")
60-
.setProperty("security.protocol", "SASL_SSL")
61-
.setProperty("sasl.mechanism", "PLAIN")
62-
.setProperty("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";", connectionString))
63-
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
64-
.setValueOnlyDeserializer(new SimpleStringSchema())
65-
.build();
44+
## Code in Flink
6645

67-
DataStream<String> kafka = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
68-
kafka.print();
69-
```
46+
`IOTdemo.java`
7047

71-
The connection string for IoT Hub can be found here -
72-
73-
:::image type="content" source="./media/azure-iot-hub/built-in-endpoint.png" alt-text="Screenshot shows built-in endpoints." lightbox="./media/azure-iot-hub/built-in-endpoint.png":::
48+
- KafkaSource:
49+
IoTHub is build on top of event hub and hence supports a kafka-like API. So in our Flink job, we can define a KafkaSource with appropriate parameters to consume messages from IoTHub.
7450

75-
Within the connection string, you can find a service bus URL (URL of the underlying event hub namespace), which you need to add as a bootstrap server in your kafka source. In this case, it is: `iothub-ns-sagiri-iot-25146639-20dff4e426.servicebus.windows.net:9093`
51+
- FileSink:
52+
Define the ABFS sink.
7653

77-
### Defining the ABFS sink
7854

79-
```java
80-
String outputPath = "abfs://<container name>@<your storage account's dfs endpoint>";
81-
82-
final FileSink<String> sink = FileSink
83-
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
84-
.withRollingPolicy(
85-
DefaultRollingPolicy.builder()
86-
.withRolloverInterval(Duration.ofMinutes(2))
87-
.withInactivityInterval(Duration.ofMinutes(3))
88-
.withMaxPartSize(MemorySize.ofMebiBytes(5))
89-
.build())
90-
.build();
91-
92-
kafka.sinkTo(sink);
9355
```
94-
95-
### Flink job code
96-
97-
```java
98-
package org.example;
99-
100-
import java.time.Duration;
101-
import java.util.Properties;
56+
package contoso.example
57+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
10258
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
103-
import org.apache.flink.configuration.Configuration;
104-
import org.apache.flink.configuration.ConfigurationUtils;
59+
import org.apache.flink.api.common.serialization.SimpleStringSchema;
60+
import org.apache.flink.client.program.StreamContextEnvironment;
10561
import org.apache.flink.configuration.MemorySize;
10662
import org.apache.flink.connector.file.sink.FileSink;
107-
import org.apache.flink.core.fs.Path;
108-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
109-
import org.apache.flink.streaming.api.datastream.DataStream;
110-
import org.apache.flink.api.common.serialization.SimpleStringSchema;
11163
import org.apache.flink.connector.kafka.source.KafkaSource;
11264
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
113-
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
65+
import org.apache.flink.core.fs.Path;
66+
import org.apache.flink.streaming.api.datastream.DataStream;
67+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
11468
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
11569
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
11670
117-
public class StreamingJob {
118-
public static void main(String[] args) throws Throwable {
119-
120-
Properties props = new Properties();
121-
props.put(
122-
"fs.azure.account.key.<your storage account's dfs endpoint>",
123-
"<your storage account's shared access key>"
124-
);
125-
126-
Configuration conf = ConfigurationUtils.createConfiguration(props);
71+
import java.time.Duration;
72+
public class IOTdemo {
12773
128-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
74+
public static void main(String[] args) throws Exception {
12975
130-
String connectionString = "<your iot hub connection string>";
76+
// create execution environment
77+
StreamExecutionEnvironment env = StreamContextEnvironment.getExecutionEnvironment();
13178
132-
133-
KafkaSource<String> source = KafkaSource.<String>builder()
134-
.setBootstrapServers("<your iot hub's service bus url>:9093")
135-
.setTopics("<name of your iot hub>")
136-
.setGroupId("$Default")
137-
.setProperty("partition.discovery.interval.ms", "10000")
138-
.setProperty("security.protocol", "SASL_SSL")
139-
.setProperty("sasl.mechanism", "PLAIN")
140-
.setProperty("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";", connectionString))
141-
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
142-
.setValueOnlyDeserializer(new SimpleStringSchema())
143-
.build();
79+
String connectionString = "<your iot hub connection string>";
14480
81+
KafkaSource<String> source = KafkaSource.<String>builder()
82+
.setBootstrapServers("<your iot hub's service bus url>:9093")
83+
.setTopics("<name of your iot hub>")
84+
.setGroupId("$Default")
85+
.setProperty("partition.discovery.interval.ms", "10000")
86+
.setProperty("security.protocol", "SASL_SSL")
87+
.setProperty("sasl.mechanism", "PLAIN")
88+
.setProperty("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";", connectionString))
89+
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
90+
.setValueOnlyDeserializer(new SimpleStringSchema())
91+
.build();
14592
146-
DataStream<String> kafka = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
147-
kafka.print();
93+
DataStream<String> kafka = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
14894
149-
String outputPath = "abfs://<container name>@<your storage account's dfs endpoint>";
95+
String outputPath = "abfs://<container>@<account_name>.dfs.core.windows.net/flink/data/azureiothubmessage/";
15096
151-
final FileSink<String> sink = FileSink
152-
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
153-
.withRollingPolicy(
154-
DefaultRollingPolicy.builder()
155-
.withRolloverInterval(Duration.ofMinutes(2))
156-
.withInactivityInterval(Duration.ofMinutes(3))
157-
.withMaxPartSize(MemorySize.ofMebiBytes(5))
158-
.build())
159-
.build();
97+
final FileSink<String> sink = FileSink
98+
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
99+
.withRollingPolicy(
100+
DefaultRollingPolicy.builder()
101+
.withRolloverInterval(Duration.ofMinutes(2))
102+
.withInactivityInterval(Duration.ofMinutes(3))
103+
.withMaxPartSize(MemorySize.ofMebiBytes(5))
104+
.build())
105+
.build();
160106
161-
kafka.sinkTo(sink);
107+
kafka.sinkTo(sink);
162108
163-
env.execute("Azure-IoTHub-Flink-ABFS");
164-
}
109+
env.execute("Sink Azure IOT hub to ADLS gen2");
110+
}
165111
}
166-
167112
```
168113

169-
#### Maven dependencies
170114

115+
**Maven pom.xml**
171116
```xml
172-
<dependency>
173-
<groupId>org.apache.flink</groupId>
174-
<artifactId>flink-java</artifactId>
175-
<version>${flink.version}</version>
176-
</dependency>
177-
<dependency>
178-
<groupId>org.apache.flink</groupId>
179-
<artifactId>flink-streaming-java</artifactId>
180-
<version>${flink.version}</version>
181-
</dependency>
182-
<dependency>
183-
<groupId>org.apache.flink</groupId>
184-
<artifactId>flink-streaming-scala_2.12</artifactId>
185-
<version>${flink.version}</version>
186-
</dependency>
187-
<dependency>
188-
<groupId>org.apache.flink</groupId>
189-
<artifactId>flink-clients</artifactId>
190-
<version>${flink.version}</version>
191-
</dependency>
192-
<dependency>
193-
<groupId>org.apache.flink</groupId>
194-
<artifactId>flink-connector-kafka</artifactId>
195-
<version>${flink.version}</version>
196-
</dependency>
197-
<dependency>
198-
<groupId>org.apache.flink</groupId>
199-
<artifactId>flink-connector-files</artifactId>
200-
<version>${flink.version}</version>
201-
</dependency>
117+
<groupId>contoso.example</groupId>
118+
<artifactId>FlinkIOTDemo</artifactId>
119+
<version>1.0-SNAPSHOT</version>
120+
<properties>
121+
<maven.compiler.source>1.8</maven.compiler.source>
122+
<maven.compiler.target>1.8</maven.compiler.target>
123+
<flink.version>1.17.0</flink.version>
124+
<java.version>1.8</java.version>
125+
<scala.binary.version>2.12</scala.binary.version>
126+
</properties>
127+
<dependencies>
128+
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
129+
<dependency>
130+
<groupId>org.apache.flink</groupId>
131+
<artifactId>flink-java</artifactId>
132+
<version>${flink.version}</version>
133+
</dependency>
134+
<dependency>
135+
<groupId>org.apache.flink</groupId>
136+
<artifactId>flink-streaming-java</artifactId>
137+
<version>${flink.version}</version>
138+
</dependency>
139+
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
140+
<dependency>
141+
<groupId>org.apache.flink</groupId>
142+
<artifactId>flink-clients</artifactId>
143+
<version>${flink.version}</version>
144+
</dependency>
145+
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
146+
<dependency>
147+
<groupId>org.apache.flink</groupId>
148+
<artifactId>flink-connector-files</artifactId>
149+
<version>${flink.version}</version>
150+
</dependency>
151+
<dependency>
152+
<groupId>org.apache.flink</groupId>
153+
<artifactId>flink-connector-kafka</artifactId>
154+
<version>${flink.version}</version>
155+
</dependency>
156+
</dependencies>
157+
<build>
158+
<plugins>
159+
<plugin>
160+
<groupId>org.apache.maven.plugins</groupId>
161+
<artifactId>maven-assembly-plugin</artifactId>
162+
<version>3.0.0</version>
163+
<configuration>
164+
<appendAssemblyId>false</appendAssemblyId>
165+
<descriptorRefs>
166+
<descriptorRef>jar-with-dependencies</descriptorRef>
167+
</descriptorRefs>
168+
</configuration>
169+
<executions>
170+
<execution>
171+
<id>make-assembly</id>
172+
<phase>package</phase>
173+
<goals>
174+
<goal>single</goal>
175+
</goals>
176+
</execution>
177+
</executions>
178+
</plugin>
179+
</plugins>
180+
</build>
181+
</project>
202182
```
203183

184+
## Package the jar and submit the job in Flink cluster
185+
186+
Upload the jar into webssh pod and submit the jar.
187+
188+
```
189+
user@sshnode-0 [ ~ ]$ bin/flink run -c IOTdemo -j FlinkIOTDemo-1.0-SNAPSHOT.jar
190+
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
191+
SLF4J: Defaulting to no-operation (NOP) logger implementation
192+
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
193+
Job has been submitted with JobID de1931b1c1179e7530510b07b7ced858
194+
```
195+
## Check job on Flink Dashboard UI
204196

205-
### Submit job
197+
:::image type="content" source="./media/azure-iot-hub/flink-ui-dashboard.png" alt-text="Screenshot showing the Flink UI dashboard." lightbox="./media/azure-iot-hub/flink-ui-dashboard.png":::
206198

207-
Submit job using HDInsight on AKS's [Flink job submission API](./flink-job-management.md)
199+
## Check Result on ADLS gen2 on Azure portal
208200

209-
:::image type="content" source="./media/azure-iot-hub/create-new-job.png" alt-text="Screenshot shows create a new job." lightbox="./media/azure-iot-hub/create-new-job.png":::
201+
:::image type="content" source="./media/azure-iot-hub/check-results.png" alt-text="Screenshot showing the results." lightbox="./media/azure-iot-hub/check-results.png":::
210202

211203
### Reference
212204

213205
- [Apache Flink Website](https://flink.apache.org/)
214-
- Apache, Apache Kafka, Kafka, Apache Flink, Flink, and associated open source project names are [trademarks](../trademarks.md) of the [Apache Software Foundation](https://www.apache.org/) (ASF).
206+
- Apache, Apache Kafka, Kafka, Apache Flink, Flink, and associated open source project names are [trademarks](../trademarks.md) of the [Apache Software Foundation](https://www.apache.org/) (ASF).
-106 KB
Loading
208 KB
Loading
-213 KB
Loading
Binary file not shown.
88.7 KB
Loading
113 KB
Loading

0 commit comments

Comments
 (0)