Skip to content

Commit f770e9a

Browse files
authored
Merge pull request #269058 from sreekzz/patch-9
Update use-hive-catalog.md
2 parents aa8c1e0 + 4d1aeef commit f770e9a

File tree

10 files changed

+48
-41
lines changed

10 files changed

+48
-41
lines changed

articles/hdinsight-aks/flink/assign-kafka-topic-event-message-to-azure-data-lake-storage-gen2.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
---
22
title: Write event messages into Azure Data Lake Storage Gen2 with Apache Flink® DataStream API
3-
description: Learn how to write event messages into Azure Data Lake Storage Gen2 with Apache Flink® DataStream API
3+
description: Learn how to write event messages into Azure Data Lake Storage Gen2 with Apache Flink® DataStream API.
44
ms.service: hdinsight-aks
55
ms.topic: how-to
6-
ms.date: 10/27/2023
6+
ms.date: 03/14/2024
77
---
88

99
# Write event messages into Azure Data Lake Storage Gen2 with Apache Flink® DataStream API
@@ -22,11 +22,11 @@ Apache Flink uses file systems to consume and persistently store data, both for
2222

2323
## Apache Flink FileSystem connector
2424

25-
This filesystem connector provides the same guarantees for both BATCH and STREAMING and is designed to provide exactly once semantics for STREAMING execution. For more information, see [Flink DataStream Filesystem](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/filesystem)
25+
This filesystem connector provides the same guarantees for both BATCH and STREAMING and is designed to provide exactly once semantics for STREAMING execution. For more information, see [Flink DataStream Filesystem](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/filesystem).
2626

2727
## Apache Kafka Connector
2828

29-
Flink provides an Apache Kafka connector for reading data from and writing data to Kafka topics with exactly once guarantees. For more information, see [Apache Kafka Connector](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/kafka)
29+
Flink provides an Apache Kafka connector for reading data from and writing data to Kafka topics with exactly once guarantees. For more information, see [Apache Kafka Connector](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/kafka).
3030

3131
## Build the project for Apache Flink
3232

@@ -36,7 +36,7 @@ Flink provides an Apache Kafka connector for reading data from and writing data
3636
<properties>
3737
<maven.compiler.source>1.8</maven.compiler.source>
3838
<maven.compiler.target>1.8</maven.compiler.target>
39-
<flink.version>1.16.0</flink.version>
39+
<flink.version>1.17.0</flink.version>
4040
<java.version>1.8</java.version>
4141
<scala.binary.version>2.12</scala.binary.version>
4242
<kafka.version>3.2.0</kafka.version>
@@ -163,17 +163,17 @@ public class KafkaSinkToGen2 {
163163

164164
**Submit the job on Flink Dashboard UI**
165165

166-
We are using Maven to package a jar onto local and submitting to Flink, and using Kafka to sink into ADLS Gen2
166+
We are using Maven to package a jar onto local and submitting to Flink, and using Kafka to sink into ADLS Gen2.
167167

168168
:::image type="content" source="./media/assign-kafka-topic-event-message-to-azure-data-lake-storage-gen2/submit-the-job-flink-ui.png" alt-text="Screenshot showing jar submission to Flink dashboard.":::
169-
:::image type="content" source="./media/assign-kafka-topic-event-message-to-azure-data-lake-storage-gen2/submit-the-job-flink-ui-2.png" alt-text="Screenshot showing job running on Flink dashboard.":::
169+
:::Image type="content" source="./media/assign-kafka-topic-event-message-to-azure-data-lake-storage-gen2/submit-the-job-flink-ui-2.png" alt-text="Screenshot showing job running on Flink dashboard.":::
170170

171171
**Validate streaming data on ADLS Gen2**
172172

173173
We are seeing the `click_events` streaming into ADLS Gen2
174174

175175
:::image type="content" source="./media/assign-kafka-topic-event-message-to-azure-data-lake-storage-gen2/validate-stream-azure-data-lake-storage-gen2-1.png" alt-text="Screenshot showing ADLS Gen2 output.":::
176-
:::image type="content" source="./media/assign-kafka-topic-event-message-to-azure-data-lake-storage-gen2/validate-stream-azure-data-lake-storage-gen2-2.png" alt-text="Screenshot showing Flink click event output.":::
176+
:::Image type="content" source="./media/assign-kafka-topic-event-message-to-azure-data-lake-storage-gen2/validate-stream-azure-data-lake-storage-gen2-2.png" alt-text="Screenshot showing Flink click event output.":::
177177

178178
You can specify a rolling policy that rolls the in-progress part file on any of the following three conditions:
179179

articles/hdinsight-aks/flink/flink-catalog-delta-hive.md

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ title: Table API and SQL - Use Delta Catalog type with Hive with Apache Flink®
33
description: Learn about how to create Delta Catalog with Apache Flink® on Azure HDInsight on AKS
44
ms.service: hdinsight-aks
55
ms.topic: how-to
6-
ms.date: 08/29/2023
6+
ms.date: 03/14/2024
77
---
88

99
# Create Delta Catalog with Apache Flink® on Azure HDInsight on AKS
@@ -23,11 +23,12 @@ In this article, we learn how Apache Flink SQL/TableAPI is used to implement a D
2323
Once you launch the Secure Shell (SSH), let us start downloading the dependencies required to the SSH node, to illustrate the Delta table managed in Hive catalog.
2424

2525
```
26-
wget https://repo1.maven.org/maven2/io/delta/delta-standalone_2.12/3.0.0rc1/delta-standalone_2.12-3.0.0rc1.jar -P $FLINK_HOME/lib
27-
wget https://repo1.maven.org/maven2/io/delta/delta-flink/3.0.0rc1/delta-flink-3.0.0rc1.jar -P $FLINK_HOME/lib
28-
wget https://repo1.maven.org/maven2/com/chuusai/shapeless_2.12/2.3.4/shapeless_2.12-2.3.4.jar -P $FLINK_HOME/lib
29-
wget https://repo1.maven.org/maven2/org/apache/flink/flink-parquet/1.16.0/flink-parquet-1.16.0.jar -P $FLINK_HOME/lib
30-
wget https://repo1.maven.org/maven2/org/apache/parquet/parquet-hadoop-bundle/1.12.2/parquet-hadoop-bundle-1.12.2.jar -P $FLINK_HOME/lib
26+
wget https://repo1.maven.org/maven2/io/delta/delta-storage/3.0.0/delta-storage-3.0.0.jar -P $FLINK_HOME/lib
27+
wget https://repo1.maven.org/maven2/io/delta/delta-standalone_2.12/3.0.0/delta-standalone_2.12-3.0.0.jar -P $FLINK_HOME/lib
28+
wget https://repo1.maven.org/maven2/io/delta/delta-flink/3.0.0/delta-flink-3.0.0.jar -P $FLINK_HOME/lib
29+
wget https://repo1.maven.org/maven2/com/chuusai/shapeless_2.12/2.3.4/shapeless_2.12-2.3.4.jar -P $FLINK_HOME/lib
30+
wget https://repo1.maven.org/maven2/org/apache/flink/flink-parquet/{flink.version}/flink-parquet-{flink.version}.jar -P $FLINK_HOME/lib
31+
wget https://repo1.maven.org/maven2/org/apache/parquet/parquet-hadoop-bundle/1.12.2/parquet-hadoop-bundle-1.12.2.jar -P $FLINK_HOME/lib
3132
```
3233

3334
### Start the Apache Flink SQL Client
@@ -51,11 +52,12 @@ Using the delta catalog
5152
#### Add dependencies to server classpath
5253

5354
```sql
54-
ADD JAR '/opt/flink-webssh/lib/delta-flink-3.0.0rc1.jar';
55-
ADD JAR '/opt/flink-webssh/lib/delta-standalone_2.12-3.0.0rc1.jar';
55+
ADD JAR '/opt/flink-webssh/lib/delta-standalone_2.12-3.0.0.jar';
56+
ADD JAR '/opt/flink-webssh/lib/delta-storage-3.0.0.jar';
57+
ADD JAR '/opt/flink-webssh/lib/delta-flink-3.0.0.jar';
5658
ADD JAR '/opt/flink-webssh/lib/shapeless_2.12-2.3.4.jar';
5759
ADD JAR '/opt/flink-webssh/lib/parquet-hadoop-bundle-1.12.2.jar';
58-
ADD JAR '/opt/flink-webssh/lib/flink-parquet-1.16.0.jar';
60+
ADD JAR '/opt/flink-webssh/lib/flink-parquet-1.17.0.jar';
5961
```
6062
#### Create Table
6163

articles/hdinsight-aks/flink/join-stream-kafka-table-filesystem.md

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
---
22
title: Enrich the events from Apache Kafka® with the attributes from FileSystem with Apache Flink®
3-
description: Learn how to join stream from Kafka with table from fileSystem using Apache Flink® DataStream API
3+
description: Learn how to join stream from Kafka with table from fileSystem using Apache Flink® DataStream API.
44
ms.service: hdinsight-aks
55
ms.topic: how-to
6-
ms.date: 08/29/2023
6+
ms.date: 03/14/2024
77
---
88

99
# Enrich the events from Apache Kafka® with attributes from ADLS Gen2 with Apache Flink®
1010

1111
[!INCLUDE [feature-in-preview](../includes/feature-in-preview.md)]
1212

13-
In this article, you can learn how you can enrich the real time events by joining a stream from Kafka with table on ADLS Gen2 using Flink Streaming. We use Flink Streaming API to join events from HDInsight Kafka with attributes from ADLS Gen2, further we use attributes-joined events to sink into another Kafka topic.
13+
In this article, you can learn how you can enrich the real time events by joining a stream from Kafka with table on ADLS Gen2 using Flink Streaming. We use Flink Streaming API to join events from HDInsight Kafka with attributes from ADLS Gen2. Further we use attributes-joined events to sink into another Kafka topic.
1414

1515
## Prerequisites
1616

1717
* [Flink cluster on HDInsight on AKS](../flink/flink-create-cluster-portal.md)
1818
* [Kafka cluster on HDInsight](../../hdinsight/kafka/apache-kafka-get-started.md)
19-
* You're required to ensure the network settings are taken care as described on [Using Kafka on HDInsight](../flink/process-and-consume-data.md); that's to make sure HDInsight on AKS and HDInsight clusters are in the same VNet
19+
* Ensure the network settings are taken care as described on [Using Kafka on HDInsight](../flink/process-and-consume-data.md) to make sure HDInsight on AKS and HDInsight clusters are in the same VNet
2020
* For this demonstration, we're using a Window VM as maven project develop environment in the same VNet as HDInsight on AKS
2121

2222
## Kafka topic preparation
@@ -45,7 +45,7 @@ We're creating a topic called `user_events`.
4545

4646
## Prepare file on ADLS Gen2
4747

48-
We are creating a file called `item attributes` in our storage
48+
We're creating a file called `item attributes` in our storage
4949

5050
- The purpose is to read a batch of `item attributes` from a file on ADLS Gen2. Each item has the following fields:
5151
```
@@ -59,7 +59,7 @@ We are creating a file called `item attributes` in our storage
5959

6060
## Develop the Apache Flink job
6161

62-
In this step we perform the following activities
62+
In this step, we perform the following activities
6363
- Enrich the `user_events` topic from Kafka by joining with `item attributes` from a file on ADLS Gen2.
6464
- We push the outcome of this step, as an enriched user activity of events into a Kafka topic.
6565

@@ -81,7 +81,7 @@ In this step we perform the following activities
8181
<properties>
8282
<maven.compiler.source>1.8</maven.compiler.source>
8383
<maven.compiler.target>1.8</maven.compiler.target>
84-
<flink.version>1.16.0</flink.version>
84+
<flink.version>1.17.0</flink.version>
8585
<java.version>1.8</java.version>
8686
<scala.binary.version>2.12</scala.binary.version>
8787
<kafka.version>3.2.0</kafka.version> //replace with 2.4.1 if you are using HDInsight Kafka 2.4.1
@@ -195,14 +195,19 @@ public class KafkaJoinGen2Demo {
195195
DataStream<String> kafkaData = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");
196196

197197
// Parse Kafka source data
198-
DataStream<Tuple4<String, String, String, String>> userEvents = kafkaData.map(new MapFunction<String, Tuple4<String, String, String, String>>() {
199-
@Override
200-
public Tuple4<String, String, String, String> map(String value) throws Exception {
201-
// Parse the line into a Tuple4
202-
String[] parts = value.split(",");
203-
return new Tuple4<>(parts[0], parts[1], parts[2], parts[3]);
204-
}
205-
});
198+
DataStream<Tuple4<String, String, String, String>> userEvents = kafkaData.map(new MapFunction<String, Tuple4<String, String, String, String>>() {
199+
@Override
200+
public Tuple4<String, String, String, String> map(String value) throws Exception {
201+
// Parse the line into a Tuple4
202+
String[] parts = value.split(",");
203+
if (parts.length < 4) {
204+
// Log and skip malformed record
205+
System.out.println("Malformed record: " + value);
206+
return null;
207+
}
208+
return new Tuple4<>(parts[0], parts[1], parts[2], parts[3]);
209+
}
210+
});
206211

207212
// 4. Enrich the user activity events by joining the items' attributes from a file
208213
DataStream<Tuple7<String,String,String,String,String,String,String>> enrichedData = userEvents.map(new MyJoinFunction());
@@ -254,7 +259,7 @@ public class KafkaJoinGen2Demo {
254259
}
255260
```
256261

257-
## Package jar and submit to Apache Flink
262+
## Package jar, and submit to Apache Flink
258263

259264
We're submitting the packaged jar to Flink:
260265

@@ -265,13 +270,13 @@ We're submitting the packaged jar to Flink:
265270

266271
### Produce real-time `user_events` topic on Kafka
267272

268-
We are able to produce real-time user behavior event `user_events` in Kafka.
273+
We're able to produce real-time user behavior event `user_events` in Kafka.
269274

270275
:::image type="content" source="./media/join-stream-kafka-table-filesystem/step-5-kafka-3-2.png" alt-text="Screenshot showing a real-time user behavior event on Kafka 3.2." border="true" lightbox="./media/join-stream-kafka-table-filesystem/step-5-kafka-3-2.png":::
271276

272277
### Consume the `itemAttributes` joining with `user_events` on Kafka
273278

274-
We are now using `itemAttributes` on filesystem join user activity events `user_events`.
279+
We're now using `itemAttributes` on filesystem join user activity events `user_events`.
275280

276281
:::image type="content" source="./media/join-stream-kafka-table-filesystem/step-6-kafka-3-2.png" alt-text="Screenshot showing Consume the item attributes-joined user activity events on Kafka 3.2." border="true" lightbox="./media/join-stream-kafka-table-filesystem/step-6-kafka-3-2.png":::
277282

Loading
Loading
34.6 KB
Loading
72.4 KB
Loading
-12.8 KB
Loading
-187 KB
Loading

articles/hdinsight-aks/flink/use-hive-catalog.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ title: Use Hive Catalog, Hive Read & Write demo on Apache Flink®
33
description: Learn how to use Hive Catalog, Hive Read & Write demo on Apache Flink® on HDInsight on AKS
44
ms.service: hdinsight-aks
55
ms.topic: how-to
6-
ms.date: 10/27/2023
6+
ms.date: 03/18/2023
77
---
88

99
# How to use Hive Catalog with Apache Flink® on HDInsight on AKS
@@ -143,19 +143,19 @@ mysql> desc orders;
143143
> Download the correct version jar according to our HDInsight kafka version and MySQL version.
144144
145145
```
146-
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/1.16.0/flink-connector-jdbc-1.16.0.jar
146+
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
147147
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
148148
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
149-
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.16.0/flink-connector-kafka-1.16.0.jar
149+
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar
150150
```
151151

152152
**Moving the planner jar**
153153

154154
Move the jar flink-table-planner_2.12-1.16.0-0.0.18.jar located in webssh pod's /opt to /lib and move out the jar flink-table-planner-loader-1.16.0-0.0.18.jar from /lib. Refer to [issue](https://issues.apache.org/jira/browse/FLINK-25128) for more details. Perform the following steps to move the planner jar.
155155

156156
```
157-
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.16.0-0.0.18.jar /opt/flink-webssh/lib/
158-
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.16.0-0.0.18.jar /opt/flink-webssh/opt/
157+
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-1.1.1.3.jar /opt/flink-webssh/opt/
158+
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-1.1.1.3.jar /opt/flink-webssh/lib/
159159
```
160160

161161
> [!NOTE]
@@ -165,7 +165,7 @@ mv /opt/flink-webssh/lib/flink-table-planner-loader-1.16.0-0.0.18.jar /opt/flink
165165
### Use bin/sql-client.sh to connect to Flink SQL
166166

167167
```
168-
bin/sql-client.sh -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.16.0.jar -j flink-connector-jdbc-1.16.0.jar -j mysql-connector-j-8.0.33.jar
168+
bin/sql-client.sh -j flink-connector-jdbc-3.1.0-1.17.jar -j mysql-connector-j-8.0.33.jar -j kafka-clients-3.2.0.jar -j flink-connector-kafka-1.17.0.jar
169169
```
170170

171171
### Create Hive catalog and connect to the hive catalog on Flink SQL

0 commit comments

Comments
 (0)