Skip to content

Commit 3d07015

Browse files
author
Sreekanth Iyer (Ushta Te Consultancy Services)
committed
Mofified this Page
1 parent 56f275d commit 3d07015

File tree

7 files changed

+172
-1
lines changed

7 files changed

+172
-1
lines changed

articles/hdinsight-aks/flink/azure-iot-hub.md

Lines changed: 172 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ ms.date: 04/04/2024
1010

1111
Azure IoT Hub is a managed service hosted in the cloud that acts as a central message hub for communication between an IoT application and its attached devices. You can connect millions of devices and their backend solutions reliably and securely. Almost any device can be connected to an IoT hub.
1212

13+
In this example, the code processes real-time IoT data on Apache Flink® with Azure HDInsight on AKS and sinks to ADLS gen2 storage.
14+
1315
## Prerequisites
1416

1517
* [Create an Azure IoTHub](/azure/iot-hub/iot-hub-create-through-portal/)
@@ -20,14 +22,183 @@ Azure IoT Hub is a managed service hosted in the cloud that acts as a central me
2022
> [!NOTE]
2123
> For this demonstration, we are using a Window VM as maven project develop env in the same VNET as HDInsight on AKS.
2224
25+
## Flink cluster 1.17.0 on HDInsight on AKS
2326

2427
:::image type="content" source="./media/azure-iot-hub/configuration-management.png" alt-text="Diagram showing search bar in Azure portal." lightbox="./media/azure-iot-hub/configuration-management.png":::
2528

29+
## Azure IOT Hub on Azure portal
30+
31+
Within the connection string, you can find a service bus URL (URL of the underlying event hub namespace), which you need to add as a bootstrap server in your Kafka source. In this example, it's `iothub-ns-contosoiot-55642726-4642a54853.servicebus.windows.net:9093`.
2632

2733
:::image type="content" source="./media/azure-iot-hub/built-in-endpoint.png" alt-text="Screenshot shows built-in endpoints." lightbox="./media/azure-iot-hub/built-in-endpoint.png":::
2834

35+
## Prepare message into Azure IOT device
36+
37+
Each IoT hub comes with built-in system endpoints to handle system and device messages.
38+
39+
For more information, see [How to use VS Code as IoT Hub Device Simulator](https://devblogs.microsoft.com/iotdev/use-vs-code-as-iot-hub-device-simulator-say-hello-to-azure-iot-hub-in-5-minutes/).
40+
41+
42+
:::image type="content" source="./media/azure-iot-hub/send-messages.png" alt-text="Screenshot shows built-in endpoints." lightbox="./media/azure-iot-hub/send-messages.png":::
43+
44+
## Code in Flink
45+
46+
`IOTdemo.java`
47+
48+
- KafkaSource:
49+
IoTHub is build on top of event hub and hence supports a kafka-like API. So in our Flink job, we can define a KafkaSource with appropriate parameters to consume messages from IoTHub.
50+
51+
- FileSink:
52+
Define the ABFS sink.
53+
54+
55+
```
56+
package contoso.example
57+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
58+
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
59+
import org.apache.flink.api.common.serialization.SimpleStringSchema;
60+
import org.apache.flink.client.program.StreamContextEnvironment;
61+
import org.apache.flink.configuration.MemorySize;
62+
import org.apache.flink.connector.file.sink.FileSink;
63+
import org.apache.flink.connector.kafka.source.KafkaSource;
64+
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
65+
import org.apache.flink.core.fs.Path;
66+
import org.apache.flink.streaming.api.datastream.DataStream;
67+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
68+
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
69+
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
70+
71+
import java.time.Duration;
72+
public class IOTdemo {
73+
74+
public static void main(String[] args) throws Exception {
75+
76+
// create execution environment
77+
StreamExecutionEnvironment env = StreamContextEnvironment.getExecutionEnvironment();
78+
79+
String connectionString = "<your iot hub connection string>";
80+
81+
KafkaSource<String> source = KafkaSource.<String>builder()
82+
.setBootstrapServers("<your iot hub's service bus url>:9093")
83+
.setTopics("<name of your iot hub>")
84+
.setGroupId("$Default")
85+
.setProperty("partition.discovery.interval.ms", "10000")
86+
.setProperty("security.protocol", "SASL_SSL")
87+
.setProperty("sasl.mechanism", "PLAIN")
88+
.setProperty("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";", connectionString))
89+
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
90+
.setValueOnlyDeserializer(new SimpleStringSchema())
91+
.build();
92+
93+
DataStream<String> kafka = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
94+
95+
String outputPath = "abfs://<container>@<account_name>.dfs.core.windows.net/flink/data/azureiothubmessage/";
96+
97+
final FileSink<String> sink = FileSink
98+
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
99+
.withRollingPolicy(
100+
DefaultRollingPolicy.builder()
101+
.withRolloverInterval(Duration.ofMinutes(2))
102+
.withInactivityInterval(Duration.ofMinutes(3))
103+
.withMaxPartSize(MemorySize.ofMebiBytes(5))
104+
.build())
105+
.build();
106+
107+
kafka.sinkTo(sink);
108+
109+
env.execute("Sink Azure IOT hub to ADLS gen2");
110+
}
111+
}
112+
```
113+
114+
115+
**Maven pom.xml**
116+
```xml
117+
<groupId>contoso.example</groupId>
118+
<artifactId>FlinkIOTDemo</artifactId>
119+
<version>1.0-SNAPSHOT</version>
120+
<properties>
121+
<maven.compiler.source>1.8</maven.compiler.source>
122+
<maven.compiler.target>1.8</maven.compiler.target>
123+
<flink.version>1.17.0</flink.version>
124+
<java.version>1.8</java.version>
125+
<scala.binary.version>2.12</scala.binary.version>
126+
</properties>
127+
<dependencies>
128+
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
129+
<dependency>
130+
<groupId>org.apache.flink</groupId>
131+
<artifactId>flink-java</artifactId>
132+
<version>${flink.version}</version>
133+
</dependency>
134+
<dependency>
135+
<groupId>org.apache.flink</groupId>
136+
<artifactId>flink-streaming-java</artifactId>
137+
<version>${flink.version}</version>
138+
</dependency>
139+
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
140+
<dependency>
141+
<groupId>org.apache.flink</groupId>
142+
<artifactId>flink-clients</artifactId>
143+
<version>${flink.version}</version>
144+
</dependency>
145+
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
146+
<dependency>
147+
<groupId>org.apache.flink</groupId>
148+
<artifactId>flink-connector-files</artifactId>
149+
<version>${flink.version}</version>
150+
</dependency>
151+
<dependency>
152+
<groupId>org.apache.flink</groupId>
153+
<artifactId>flink-connector-kafka</artifactId>
154+
<version>${flink.version}</version>
155+
</dependency>
156+
</dependencies>
157+
<build>
158+
<plugins>
159+
<plugin>
160+
<groupId>org.apache.maven.plugins</groupId>
161+
<artifactId>maven-assembly-plugin</artifactId>
162+
<version>3.0.0</version>
163+
<configuration>
164+
<appendAssemblyId>false</appendAssemblyId>
165+
<descriptorRefs>
166+
<descriptorRef>jar-with-dependencies</descriptorRef>
167+
</descriptorRefs>
168+
</configuration>
169+
<executions>
170+
<execution>
171+
<id>make-assembly</id>
172+
<phase>package</phase>
173+
<goals>
174+
<goal>single</goal>
175+
</goals>
176+
</execution>
177+
</executions>
178+
</plugin>
179+
</plugins>
180+
</build>
181+
</project>
182+
```
183+
184+
## Package the jar and submit the job in Flink cluster
185+
186+
Upload the jar into webssh pod and submit the jar.
187+
188+
```
189+
user@sshnode-0 [ ~ ]$ bin/flink run -c IOTdemo -j FlinkIOTDemo-1.0-SNAPSHOT.jar
190+
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
191+
SLF4J: Defaulting to no-operation (NOP) logger implementation
192+
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
193+
Job has been submitted with JobID de1931b1c1179e7530510b07b7ced858
194+
```
195+
## Check job on Flink Dashboard UI
196+
197+
:::image type="content" source="./media/azure-iot-hub/flink-ui-dashboard.png" alt-text="Screenshot showing the Flink UI dashboard." lightbox="./media/azure-iot-hub/flink-ui-dashboard.png":::
198+
199+
## Check Result on ADLS gen2 on Azure portal
29200

30-
:::image type="content" source="./media/azure-iot-hub/create-new-job.png" alt-text="Screenshot shows create a new job." lightbox="./media/azure-iot-hub/create-new-job.png":::
201+
:::image type="content" source="./media/azure-iot-hub/check-results.png" alt-text="Screenshot showing the results." lightbox="./media/azure-iot-hub/check-results.png":::
31202

32203
### Reference
33204

-106 KB
Loading
208 KB
Loading
-213 KB
Loading
Binary file not shown.
88.7 KB
Loading
113 KB
Loading

0 commit comments

Comments
 (0)