Skip to content

Commit 409c08e

Browse files
author
toutian
committed
Merge branch '1.5_v3.5.1_check_kafkalist' into '1.5_v3.5.1'
kafka brokerlist 是空的不允许运行和提交 See merge request !2
2 parents cfbcd1e + b941f39 commit 409c08e

File tree

4 files changed

+23
-7
lines changed

4 files changed

+23
-7
lines changed

core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ protected boolean fieldNameNeedsUpperCase() {
5656
return true;
5757
}
5858

59-
public abstract TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props);
59+
public abstract TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) throws Exception;
6060

6161
public boolean dealKeyPattern(String fieldRow, TableInfo tableInfo){
6262
for(Map.Entry<String, Pattern> keyPattern : keyPatternMap.entrySet()){

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,19 @@ static void dealNestField(Matcher matcher, TableInfo tableInfo) {
6262
}
6363

6464
@Override
65-
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
65+
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) throws Exception {
6666

6767
KafkaSourceTableInfo kafka09SourceTableInfo = new KafkaSourceTableInfo();
6868
kafka09SourceTableInfo.setName(tableName);
6969
parseFieldsInfo(fieldsInfo, kafka09SourceTableInfo);
7070

7171
kafka09SourceTableInfo.setParallelism(MathUtil.getIntegerVal(props.get(KafkaSourceTableInfo.PARALLELISM_KEY.toLowerCase())));
72-
kafka09SourceTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSourceTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())));
72+
String bootstrapServer = MathUtil.getString(props.get(KafkaSourceTableInfo.GROUPID_KEY.toLowerCase()));
73+
if (bootstrapServer == null || bootstrapServer.trim().equals("")){
74+
throw new Exception("BootstrapServers can not be empty!");
75+
} else {
76+
kafka09SourceTableInfo.setBootstrapServers(bootstrapServer);
77+
}
7378
kafka09SourceTableInfo.setGroupId(MathUtil.getString(props.get(KafkaSourceTableInfo.GROUPID_KEY.toLowerCase())));
7479
kafka09SourceTableInfo.setTopic(MathUtil.getString(props.get(KafkaSourceTableInfo.TOPIC_KEY.toLowerCase())));
7580
kafka09SourceTableInfo.setOffsetReset(MathUtil.getString(props.get(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase())));

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,20 @@ static void dealNestField(Matcher matcher, TableInfo tableInfo) {
6666
}
6767

6868
@Override
69-
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
69+
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) throws Exception {
7070

7171
KafkaSourceTableInfo kafka10SourceTableInfo = new KafkaSourceTableInfo();
7272
kafka10SourceTableInfo.setName(tableName);
7373
parseFieldsInfo(fieldsInfo, kafka10SourceTableInfo);
7474

7575
kafka10SourceTableInfo.setParallelism(MathUtil.getIntegerVal(props.get(KafkaSourceTableInfo.PARALLELISM_KEY.toLowerCase())));
76-
kafka10SourceTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSourceTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())));
76+
77+
String bootstrapServer = MathUtil.getString(props.get(KafkaSourceTableInfo.GROUPID_KEY.toLowerCase()));
78+
if (bootstrapServer == null || bootstrapServer.trim().equals("")){
79+
throw new Exception("BootstrapServers can not be empty!");
80+
} else {
81+
kafka10SourceTableInfo.setBootstrapServers(bootstrapServer);
82+
}
7783
kafka10SourceTableInfo.setGroupId(MathUtil.getString(props.get(KafkaSourceTableInfo.GROUPID_KEY.toLowerCase())));
7884
kafka10SourceTableInfo.setTopic(MathUtil.getString(props.get(KafkaSourceTableInfo.TOPIC_KEY.toLowerCase())));
7985
kafka10SourceTableInfo.setOffsetReset(MathUtil.getString(props.get(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase())));

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,19 @@ static void dealNestField(Matcher matcher, TableInfo tableInfo) {
6666
}
6767

6868
@Override
69-
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
69+
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) throws Exception {
7070

7171
KafkaSourceTableInfo kafka11SourceTableInfo = new KafkaSourceTableInfo();
7272
kafka11SourceTableInfo.setName(tableName);
7373
parseFieldsInfo(fieldsInfo, kafka11SourceTableInfo);
7474

7575
kafka11SourceTableInfo.setParallelism(MathUtil.getIntegerVal(props.get(KafkaSourceTableInfo.PARALLELISM_KEY.toLowerCase())));
76-
kafka11SourceTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSourceTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())));
76+
String bootstrapServer = MathUtil.getString(props.get(KafkaSourceTableInfo.GROUPID_KEY.toLowerCase()));
77+
if (bootstrapServer == null || bootstrapServer.trim().equals("")){
78+
throw new Exception("BootstrapServers can not be empty!");
79+
} else {
80+
kafka11SourceTableInfo.setBootstrapServers(bootstrapServer);
81+
}
7782
kafka11SourceTableInfo.setGroupId(MathUtil.getString(props.get(KafkaSourceTableInfo.GROUPID_KEY.toLowerCase())));
7883
kafka11SourceTableInfo.setTopic(MathUtil.getString(props.get(KafkaSourceTableInfo.TOPIC_KEY.toLowerCase())));
7984
kafka11SourceTableInfo.setOffsetReset(MathUtil.getString(props.get(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase())));

0 commit comments

Comments
 (0)