Skip to content

Commit ef4269a

Browse files
author
yanxi0227
committed
support offsetnum
1 parent 4d02007 commit ef4269a

File tree

3 files changed

+15
-3
lines changed
  • kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka
  • kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka
  • kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka

3 files changed

+15
-3
lines changed

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,11 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
6868

6969
Properties props = new Properties();
7070
props.setProperty("bootstrap.servers", kafka09SourceTableInfo.getBootstrapServers());
71-
props.setProperty("auto.offset.reset", kafka09SourceTableInfo.getOffsetReset());
71+
if (DtStringUtil.isJosn(kafka09SourceTableInfo.getOffsetReset())){
72+
props.setProperty("auto.offset.reset", "none");
73+
} else {
74+
props.setProperty("auto.offset.reset", kafka09SourceTableInfo.getOffsetReset());
75+
}
7276
if (StringUtils.isNotBlank(kafka09SourceTableInfo.getGroupId())){
7377
props.setProperty("group.id", kafka09SourceTableInfo.getGroupId());
7478
}

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,11 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
6868

6969
Properties props = new Properties();
7070
props.setProperty("bootstrap.servers", kafka010SourceTableInfo.getBootstrapServers());
71-
props.setProperty("auto.offset.reset", kafka010SourceTableInfo.getOffsetReset());
71+
if (DtStringUtil.isJosn(kafka010SourceTableInfo.getOffsetReset())){
72+
props.setProperty("auto.offset.reset", "none");
73+
} else {
74+
props.setProperty("auto.offset.reset", kafka010SourceTableInfo.getOffsetReset());
75+
}
7276
if (StringUtils.isNotBlank(kafka010SourceTableInfo.getGroupId())){
7377
props.setProperty("group.id", kafka010SourceTableInfo.getGroupId());
7478
}

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaSource.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,11 @@ public Table genStreamSource(SourceTableInfo sourceTableInfo, StreamExecutionEnv
6868

6969
Properties props = new Properties();
7070
props.setProperty("bootstrap.servers", kafka011SourceTableInfo.getBootstrapServers());
71-
props.setProperty("auto.offset.reset", kafka011SourceTableInfo.getOffsetReset());
71+
if (DtStringUtil.isJosn(kafka011SourceTableInfo.getOffsetReset())){
72+
props.setProperty("auto.offset.reset", "none");
73+
} else {
74+
props.setProperty("auto.offset.reset", kafka011SourceTableInfo.getOffsetReset());
75+
}
7276
if (StringUtils.isNotBlank(kafka011SourceTableInfo.getGroupId())){
7377
props.setProperty("group.id", kafka011SourceTableInfo.getGroupId());
7478
}

0 commit comments

Comments
 (0)