Skip to content

Commit 0ebbe2f

Browse files
committed
fix
1 parent 38e1376 commit 0ebbe2f

File tree

2 files changed

+8
-7
lines changed

2 files changed

+8
-7
lines changed

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerKafka10JsonTableSink.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.apache.flink.streaming.api.datastream.DataStream;
2222
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
2323
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
24-
import org.apache.flink.streaming.connectors.kafka.Kafka09JsonTableSink;
24+
import org.apache.flink.streaming.connectors.kafka.Kafka010JsonTableSink;
2525
import org.apache.flink.streaming.connectors.kafka.KafkaJsonTableSink;
2626
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
2727
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
@@ -67,8 +67,8 @@ protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properti
6767
}
6868

6969
@Override
70-
protected Kafka09JsonTableSink createCopy() {
71-
return new Kafka09JsonTableSink(topic, properties, partitioner);
70+
protected Kafka010JsonTableSink createCopy() {
71+
return new Kafka010JsonTableSink(topic, properties, partitioner);
7272
}
7373

7474
@Override

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerKafka11JsonTableSink.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.apache.flink.streaming.api.datastream.DataStream;
2222
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
2323
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
24-
import org.apache.flink.streaming.connectors.kafka.Kafka09JsonTableSink;
24+
import org.apache.flink.streaming.connectors.kafka.Kafka010JsonTableSink;
2525
import org.apache.flink.streaming.connectors.kafka.KafkaJsonTableSink;
2626
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
2727
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
@@ -33,6 +33,7 @@
3333
import java.util.Properties;
3434

3535
/**
36+
*
3637
* Reason: add schema info
3738
* Date: 2019/4/8
3839
* Company: www.dtstack.com
@@ -60,15 +61,15 @@ public CustomerKafka11JsonTableSink(String topic, Properties properties, KafkaPa
6061
super(topic, properties, new FlinkKafkaDelegatePartitioner<>(partitioner));
6162
this.schema = schema;
6263
}
63-
64+
//TODO 暂时使用010
6465
@Override
6566
protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
6667
return new FlinkKafkaProducer010<Row>(topic, serializationSchema, properties, partitioner);
6768
}
6869

6970
@Override
70-
protected Kafka09JsonTableSink createCopy() {
71-
return new Kafka09JsonTableSink(topic, properties, partitioner);
71+
protected Kafka010JsonTableSink createCopy() {
72+
return new Kafka010JsonTableSink(topic, properties, partitioner);
7273
}
7374

7475
@Override

0 commit comments

Comments
 (0)