Skip to content

Commit cff5fbf

Browse files
authored
[hotfix-#1960][kafka] KafkaSyncConverter construct parameter transmission error (#1966)
1 parent b5d9c0d commit cff5fbf

File tree

4 files changed

+24
-14
lines changed

4 files changed

+24
-14
lines changed

chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/converter/KafkaSyncConverter.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import com.dtstack.chunjun.util.MapUtil;
4848

4949
import org.apache.flink.table.data.RowData;
50+
import org.apache.flink.table.types.logical.RowType;
5051
import org.apache.flink.util.CollectionUtil;
5152

5253
import org.apache.commons.collections.CollectionUtils;
@@ -87,8 +88,8 @@ public class KafkaSyncConverter
8788
/** kafka sink out fields */
8889
protected List<String> outList;
8990

90-
public KafkaSyncConverter(KafkaConfig kafkaConfig, List<String> keyTypeList) {
91-
super(null, kafkaConfig);
91+
public KafkaSyncConverter(KafkaConfig kafkaConfig, RowType rowType, List<String> keyTypeList) {
92+
super(rowType, kafkaConfig);
9293
this.kafkaConfig = kafkaConfig;
9394
this.outList = keyTypeList;
9495
this.jsonDecoder = new JsonDecoder(kafkaConfig.isAddMessage());
@@ -99,8 +100,8 @@ public KafkaSyncConverter(KafkaConfig kafkaConfig, List<String> keyTypeList) {
99100
}
100101
}
101102

102-
public KafkaSyncConverter(KafkaConfig kafkaConfig) {
103-
super(null, kafkaConfig);
103+
public KafkaSyncConverter(KafkaConfig kafkaConfig, RowType rowType) {
104+
super(rowType, kafkaConfig);
104105
this.commonConfig = this.kafkaConfig = kafkaConfig;
105106
this.jsonDecoder = new JsonDecoder(kafkaConfig.isAddMessage());
106107
if (DEFAULT_CODEC.defaultValue().equals(kafkaConfig.getCodec())) {

chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/converter/KafkaSyncKeyConverter.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.dtstack.chunjun.util.MapUtil;
2626

2727
import org.apache.flink.table.data.RowData;
28+
import org.apache.flink.table.types.logical.RowType;
2829
import org.apache.flink.util.CollectionUtil;
2930

3031
import org.apache.commons.lang3.StringUtils;
@@ -41,15 +42,15 @@
4142
*/
4243
public class KafkaSyncKeyConverter extends KafkaSyncConverter {
4344

44-
private final PartitionStrategy partitionStrategy;
45+
private PartitionStrategy partitionStrategy;
4546

46-
public KafkaSyncKeyConverter(KafkaConfig kafkaConf, List<String> keyTypeList) {
47-
super(kafkaConf, keyTypeList);
47+
public KafkaSyncKeyConverter(KafkaConfig kafkaConf, RowType rowType, List<String> keyTypeList) {
48+
super(kafkaConf, rowType, keyTypeList);
4849
this.partitionStrategy = PartitionStrategy.fromValue(kafkaConf.getPartitionStrategy());
4950
}
5051

51-
public KafkaSyncKeyConverter(KafkaConfig kafkaConf) {
52-
this(kafkaConf, null);
52+
public KafkaSyncKeyConverter(KafkaConfig kafkaConf, RowType rowType) {
53+
this(kafkaConf, rowType, null);
5354
}
5455

5556
@Override

chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/sink/KafkaSinkFactory.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,14 @@
2929
import com.dtstack.chunjun.converter.RawTypeMapper;
3030
import com.dtstack.chunjun.sink.SinkFactory;
3131
import com.dtstack.chunjun.util.GsonUtil;
32+
import com.dtstack.chunjun.util.TableUtil;
3233

3334
import org.apache.flink.api.common.io.OutputFormat;
3435
import org.apache.flink.streaming.api.datastream.DataStream;
3536
import org.apache.flink.streaming.api.datastream.DataStreamSink;
3637
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
3738
import org.apache.flink.table.data.RowData;
39+
import org.apache.flink.table.types.logical.RowType;
3840
import org.apache.flink.util.CollectionUtil;
3941
import org.apache.flink.util.Preconditions;
4042

@@ -96,6 +98,9 @@ protected DataStreamSink<RowData> createOutput(
9698
"when kafka sink dataCompelOrder set true , Parallelism must 1.");
9799
}
98100

101+
RowType rowType =
102+
TableUtil.createRowType(kafkaConfig.getColumn(), KafkaRawTypeMapping::apply);
103+
99104
RowSerializationSchema rowSerializationSchema;
100105
if (!CollectionUtil.isNullOrEmpty(kafkaConfig.getPartitionAssignColumns())) {
101106
Preconditions.checkState(
@@ -115,15 +120,15 @@ protected DataStreamSink<RowData> createOutput(
115120
kafkaConfig,
116121
new CustomerFlinkPartition<>(),
117122
new KafkaSyncKeyConverter(
118-
kafkaConfig, kafkaConfig.getPartitionAssignColumns()),
119-
new KafkaSyncConverter(kafkaConfig));
123+
kafkaConfig, rowType, kafkaConfig.getPartitionAssignColumns()),
124+
new KafkaSyncConverter(kafkaConfig, rowType));
120125
} else {
121126
rowSerializationSchema =
122127
new RowSerializationSchema(
123128
kafkaConfig,
124129
new CustomerFlinkPartition<>(),
125-
new KafkaSyncKeyConverter(kafkaConfig),
126-
new KafkaSyncConverter(kafkaConfig));
130+
new KafkaSyncKeyConverter(kafkaConfig, rowType),
131+
new KafkaSyncConverter(kafkaConfig, rowType));
127132
}
128133

129134
KafkaProducer kafkaProducer =

chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/source/KafkaSourceFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,10 @@ public DataStream<RowData> createSource() {
118118
RowType rowType =
119119
TableUtil.createRowType(kafkaConfig.getColumn(), KafkaRawTypeMapping::apply);
120120
DynamicKafkaDeserializationSchema deserializationSchema =
121-
new RowDeserializationSchema(kafkaConfig, new KafkaSyncConverter(kafkaConfig));
121+
new RowDeserializationSchema(
122+
kafkaConfig,
123+
new KafkaSyncConverter(
124+
kafkaConfig, rowType, kafkaConfig.getPartitionAssignColumns()));
122125
KafkaConsumerWrapper consumer =
123126
new KafkaConsumerWrapper(topics, deserializationSchema, props);
124127
switch (kafkaConfig.getMode()) {

0 commit comments

Comments
 (0)