Skip to content

Commit cd49761

Browse files
authored
Merge pull request #270909 from sreekzz/Flink-batch4
Flink updates Batch 4
2 parents ad71af3 + dbc5d6b commit cd49761

12 files changed

+68
-57
lines changed

articles/hdinsight-aks/flink/azure-service-bus-demo.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
---
22
title: Use Apache Flink on HDInsight on AKS with Azure Service Bus
3-
description: Use Apache Flink DataStream API on HDInsight on AKS with Azure Service Bus
3+
description: Use Apache Flink DataStream API on HDInsight on AKS with Azure Service Bus.
44
ms.service: hdinsight-aks
55
ms.topic: how-to
6-
ms.date: 11/27/2023
6+
ms.date: 04/02/2024
77
---
88
# Use Apache Flink on HDInsight on AKS with Azure Service Bus
99

@@ -13,7 +13,7 @@ This article provides an overview and demonstration of Apache Flink DataStream A
1313

1414
## Prerequisites
1515

16-
- [Flink Cluster 1.16.0 on HDInsight on AKS](./flink-create-cluster-portal.md)
16+
- [Flink Cluster 1.17.0 on HDInsight on AKS](./flink-create-cluster-portal.md)
1717
- For this demonstration, we use a Window VM as maven project develop env in the same VNET as HDInsight on AKS.
1818
- During the [creation](./flink-create-cluster-portal.md) of the Flink cluster, you are required to ensure that SSH access is selected. This enables you to access the cluster using Secure Shell (SSH).
1919
- Set up an [Azure Service Bus](/azure/service-bus-messaging/service-bus-messaging-overview) instance.
@@ -76,7 +76,7 @@ In the POM.xml file, we define the project's dependencies using Maven, ensuring
7676
<properties>
7777
<maven.compiler.source>1.8</maven.compiler.source>
7878
<maven.compiler.target>1.8</maven.compiler.target>
79-
<flink.version>1.16.0</flink.version>
79+
<flink.version>1.17.0</flink.version>
8080
<java.version>1.8</java.version>
8181
</properties>
8282
<dependencies>

articles/hdinsight-aks/flink/change-data-capture-connectors-for-apache-flink.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ title: How to perform Change Data Capture of SQL Server with Apache Flink® Data
33
description: Learn how to perform Change Data Capture of SQL Server with Apache Flink® DataStream API and DataStream Source.
44
ms.service: hdinsight-aks
55
ms.topic: how-to
6-
ms.date: 03/22/2024
6+
ms.date: 04/02/2024
77
---
88

99
# Change Data Capture of SQL Server with Apache Flink® DataStream API and DataStream Source on HDInsight on AKS
@@ -138,7 +138,7 @@ GO
138138
```
139139
##### Maven source code on IdeaJ
140140

141-
In the below snippet, we use HDInsight Kafka 2.4.1. Based on your usage, update the version of Kafka on `<kafka.version>`.
141+
In the below snippet, we use Kafka 2.4.1. Based on your usage, update the version of Kafka on `<kafka.version>`.
142142

143143
**maven pom.xml**
144144

@@ -158,7 +158,7 @@ In the below snippet, we use HDInsight Kafka 2.4.1. Based on your usage, update
158158
<flink.version>1.17.0</flink.version>
159159
<java.version>1.8</java.version>
160160
<scala.binary.version>2.12</scala.binary.version>
161-
<kafka.version>3.2.0</kafka.version> // Replace with 3.2 if you're using HDInsight Kafka 3.2
161+
<kafka.version>3.2.0</kafka.version> //
162162
</properties>
163163
<dependencies>
164164
<dependency>

articles/hdinsight-aks/flink/flink-how-to-setup-event-hub.md

Lines changed: 51 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ title: How to connect Apache Flink® on HDInsight on AKS with Azure Event Hubs f
33
description: Learn how to connect Apache Flink® on HDInsight on AKS with Azure Event Hubs for Apache Kafka®
44
ms.service: hdinsight-aks
55
ms.topic: how-to
6-
ms.date: 08/29/2023
6+
ms.date: 04/02/2024
77
---
88

99
# Connect Apache Flink® on HDInsight on AKS with Azure Event Hubs for Apache Kafka®
@@ -53,60 +53,66 @@ In this article, we explore how to connect [Azure Event Hubs](/azure/event-hubs/
5353
## Packaging the JAR for Flink
5454
1. Package com.example.app;
5555
56-
```
56+
```
57+
package contoso.example;
58+
5759
import org.apache.flink.api.common.functions.MapFunction;
5860
import org.apache.flink.api.common.serialization.SimpleStringSchema;
61+
62+
import org.apache.flink.api.java.utils.ParameterTool;
63+
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
64+
import org.apache.flink.connector.kafka.sink.KafkaSink;
65+
5966
import org.apache.flink.streaming.api.datastream.DataStream;
6067
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
61-
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; //v0.11.0.0
62-
import java.io.FileNotFoundException;
68+
6369
import java.io.FileReader;
6470
import java.util.Properties;
65-
66-
public class FlinkTestProducer {
67-
68-
private static final String TOPIC = "test";
69-
private static final String FILE_PATH = "src/main/resources/producer.config";
70-
71-
public static void main(String... args) {
72-
try {
73-
Properties properties = new Properties();
74-
properties.load(new FileReader(FILE_PATH));
75-
76-
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
77-
DataStream stream = createStream(env);
78-
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
79-
TOPIC,
80-
new SimpleStringSchema(), // serialization schema
81-
properties);
82-
83-
stream.addSink(myProducer);
84-
env.execute("Testing flink print");
85-
86-
} catch(FileNotFoundException e){
87-
System.out.println("FileNotFoundException: " + e);
88-
} catch (Exception e) {
89-
System.out.println("Failed with exception:: " + e);
90-
}
91-
}
92-
93-
public static DataStream createStream(StreamExecutionEnvironment env){
94-
return env.generateSequence(0, 200)
95-
.map(new MapFunction<Long, String>() {
96-
@Override
97-
public String map(Long in) {
98-
return "FLINK PRODUCE " + in;
99-
}
100-
});
101-
}
102-
}
103-
```
104-
71+
72+
public class AzureEventHubDemo {
73+
74+
public static void main(String[] args) throws Exception {
75+
// 1. get stream execution environment
76+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
77+
ParameterTool parameters = ParameterTool.fromArgs(args);
78+
String input = parameters.get("input");
79+
Properties properties = new Properties();
80+
properties.load(new FileReader(input));
81+
82+
// 2. generate stream input
83+
DataStream<String> stream = createStream(env);
84+
85+
// 3. sink to eventhub
86+
KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(properties)
87+
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
88+
.setTopic("topic1")
89+
.setValueSerializationSchema(new SimpleStringSchema())
90+
.build())
91+
.build();
92+
93+
stream.sinkTo(sink);
94+
95+
// 4. execute the stream
96+
env.execute("Produce message to Azure event hub");
97+
}
98+
99+
public static DataStream<String> createStream(StreamExecutionEnvironment env){
100+
return env.generateSequence(0, 200)
101+
.map(new MapFunction<Long, String>() {
102+
@Override
103+
public String map(Long in) {
104+
return "FLINK PRODUCE " + in;
105+
}
106+
});
107+
}
108+
}
109+
```
110+
105111
1. Add the snippet to run the Flink Producer.
106112
107113
:::image type="content" source="./media/flink-eventhub/testing-flink.png" alt-text="Screenshot showing how to test Flink in Event Hubs." border="true" lightbox="./media/flink-eventhub/testing-flink.png":::
108114
109-
1. Once the code is executed, the events are stored in the topic **“TEST”**
115+
1. Once the code is executed, the events are stored in the topic **"topic1"**
110116
111117
:::image type="content" source="./media/flink-eventhub/events-stored-in-topic.png" alt-text="Screenshot showing Event Hubs stored in topic." border="true" lightbox="./media/flink-eventhub/events-stored-in-topic.png":::
112118
32.8 KB
Loading
2.64 KB
Loading
-18.8 KB
Loading
138 KB
Loading
99.1 KB
Loading
-65 KB
Loading
-22 KB
Loading

0 commit comments

Comments
 (0)