Skip to content

Commit 6bb6169

Browse files
authored
Updated Kafka Version
Updated Kafka Version
1 parent 0d4527b commit 6bb6169

File tree

1 file changed

+15
-10
lines changed

1 file changed

+15
-10
lines changed

articles/hdinsight-aks/flink/join-stream-kafka-table-filesystem.md

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ title: Enrich the events from Apache Kafka® with the attributes from FileSystem
33
description: Learn how to join stream from Kafka with table from fileSystem using Apache Flink® DataStream API
44
ms.service: hdinsight-aks
55
ms.topic: how-to
6-
ms.date: 08/29/2023
6+
ms.date: 03/14/2024
77
---
88

99
# Enrich the events from Apache Kafka® with attributes from ADLS Gen2 with Apache Flink®
@@ -81,7 +81,7 @@ In this step we perform the following activities
8181
<properties>
8282
<maven.compiler.source>1.8</maven.compiler.source>
8383
<maven.compiler.target>1.8</maven.compiler.target>
84-
<flink.version>1.16.0</flink.version>
84+
<flink.version>1.17.0</flink.version>
8585
<java.version>1.8</java.version>
8686
<scala.binary.version>2.12</scala.binary.version>
8787
<kafka.version>3.2.0</kafka.version> //replace with 2.4.1 if you are using HDInsight Kafka 2.4.1
@@ -195,14 +195,19 @@ public class KafkaJoinGen2Demo {
195195
DataStream<String> kafkaData = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");
196196

197197
// Parse Kafka source data
198-
DataStream<Tuple4<String, String, String, String>> userEvents = kafkaData.map(new MapFunction<String, Tuple4<String, String, String, String>>() {
199-
@Override
200-
public Tuple4<String, String, String, String> map(String value) throws Exception {
201-
// Parse the line into a Tuple4
202-
String[] parts = value.split(",");
203-
return new Tuple4<>(parts[0], parts[1], parts[2], parts[3]);
204-
}
205-
});
198+
DataStream<Tuple4<String, String, String, String>> userEvents = kafkaData.map(new MapFunction<String, Tuple4<String, String, String, String>>() {
199+
@Override
200+
public Tuple4<String, String, String, String> map(String value) throws Exception {
201+
// Parse the line into a Tuple4
202+
String[] parts = value.split(",");
203+
if (parts.length < 4) {
204+
// Log and skip malformed record
205+
System.out.println("Malformed record: " + value);
206+
return null;
207+
}
208+
return new Tuple4<>(parts[0], parts[1], parts[2], parts[3]);
209+
}
210+
});
206211

207212
// 4. Enrich the user activity events by joining the items' attributes from a file
208213
DataStream<Tuple7<String,String,String,String,String,String,String>> enrichedData = userEvents.map(new MyJoinFunction());

0 commit comments

Comments
 (0)