Skip to content

Commit 69dca6c

Browse files
author
toutian
committed
Merge branch '1.5_v3.5.2_check_kafkalist' into '1.5_v3.5.2'
check BOOTSTRAPSERVERS_KEY See merge request !3
2 parents 409c08e + 8c40de0 commit 69dca6c

File tree

3 files changed

+3
-3
lines changed

3 files changed

+3
-3
lines changed

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
6969
parseFieldsInfo(fieldsInfo, kafka09SourceTableInfo);
7070

7171
kafka09SourceTableInfo.setParallelism(MathUtil.getIntegerVal(props.get(KafkaSourceTableInfo.PARALLELISM_KEY.toLowerCase())));
72-
String bootstrapServer = MathUtil.getString(props.get(KafkaSourceTableInfo.GROUPID_KEY.toLowerCase()));
72+
String bootstrapServer = MathUtil.getString(props.get(KafkaSourceTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase()));
7373
if (bootstrapServer == null || bootstrapServer.trim().equals("")){
7474
throw new Exception("BootstrapServers can not be empty!");
7575
} else {

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
7474

7575
kafka10SourceTableInfo.setParallelism(MathUtil.getIntegerVal(props.get(KafkaSourceTableInfo.PARALLELISM_KEY.toLowerCase())));
7676

77-
String bootstrapServer = MathUtil.getString(props.get(KafkaSourceTableInfo.GROUPID_KEY.toLowerCase()));
77+
String bootstrapServer = MathUtil.getString(props.get(KafkaSourceTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase()));
7878
if (bootstrapServer == null || bootstrapServer.trim().equals("")){
7979
throw new Exception("BootstrapServers can not be empty!");
8080
} else {

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
7373
parseFieldsInfo(fieldsInfo, kafka11SourceTableInfo);
7474

7575
kafka11SourceTableInfo.setParallelism(MathUtil.getIntegerVal(props.get(KafkaSourceTableInfo.PARALLELISM_KEY.toLowerCase())));
76-
String bootstrapServer = MathUtil.getString(props.get(KafkaSourceTableInfo.GROUPID_KEY.toLowerCase()));
76+
String bootstrapServer = MathUtil.getString(props.get(KafkaSourceTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase()));
7777
if (bootstrapServer == null || bootstrapServer.trim().equals("")){
7878
throw new Exception("BootstrapServers can not be empty!");
7979
} else {

0 commit comments

Comments
 (0)