Skip to content

Commit 4090b9d

Browse files
committed
kafka11sink
1 parent 03bc6b6 commit 4090b9d

File tree

6 files changed

+55
-171
lines changed

6 files changed

+55
-171
lines changed

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerCsvSerialization.java

Lines changed: 0 additions & 138 deletions
This file was deleted.

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerKafka11JsonTableSink.java

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,9 @@
2121
import org.apache.flink.streaming.api.datastream.DataStream;
2222
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
2323
import org.apache.flink.streaming.connectors.kafka.Kafka011TableSink;
24-
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
25-
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
2624
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
27-
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
28-
import org.apache.flink.table.util.TableConnectorUtil;
25+
import org.apache.flink.table.api.TableSchema;
26+
import org.apache.flink.table.utils.TableConnectorUtils;
2927
import org.apache.flink.types.Row;
3028

3129
import java.util.Optional;
@@ -41,26 +39,19 @@
4139
*/
4240
public class CustomerKafka11JsonTableSink extends Kafka011TableSink {
4341

44-
4542
protected SerializationSchema schema;
4643

47-
public CustomerKafka11JsonTableSink(String topic, Properties properties, SerializationSchema schema) {
48-
super(topic, properties, new FlinkFixedPartitioner<>());
49-
this.schema = schema;
50-
}
5144

52-
public CustomerKafka11JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner, SerializationSchema schema) {
53-
super(topic, properties, partitioner);
54-
this.schema = schema;
55-
}
45+
public CustomerKafka11JsonTableSink(TableSchema schema,
46+
String topic,
47+
Properties properties,
48+
Optional<FlinkKafkaPartitioner<Row>> partitioner,
49+
SerializationSchema<Row> serializationSchema) {
5650

57-
58-
@Deprecated
59-
public CustomerKafka11JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner, SerializationSchema schema) {
60-
super(topic, properties, new FlinkKafkaDelegatePartitioner<>(partitioner));
61-
this.schema = schema;
51+
super(schema, topic, properties, partitioner, serializationSchema);
52+
this.schema = serializationSchema;
6253
}
63-
//TODO 暂时使用010
54+
6455
@Override
6556
protected SinkFunction<Row> createKafkaProducer(String s, Properties properties, SerializationSchema<Row> serializationSchema, Optional<FlinkKafkaPartitioner<Row>> optional) {
6657
return new CustomerFlinkKafkaProducer011<Row>(topic, serializationSchema, properties);
@@ -71,6 +62,6 @@ public void emitDataStream(DataStream<Row> dataStream) {
7162
SinkFunction<Row> kafkaProducer = createKafkaProducer(topic, properties, schema, partitioner);
7263
// always enable flush on checkpoint to achieve at-least-once if query runs with checkpointing enabled.
7364
//kafkaProducer.setFlushOnCheckpoint(true);
74-
dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass(), fieldNames));
65+
dataStream.addSink(kafkaProducer).name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames()));
7566
}
7667
}

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,11 @@ public class KafkaSink implements AppendStreamTableSink<Row>, IStreamSinkGener<
7474
@Override
7575
public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
7676
KafkaSinkTableInfo kafka011SinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
77-
this.topic = kafka011SinkTableInfo.getKafkaParam("topic");
77+
this.topic = kafka011SinkTableInfo.getTopic();
7878

7979
Properties props = new Properties();
80+
props.setProperty("bootstrap.servers", kafka011SinkTableInfo.getBootstrapServers());
81+
8082
for (String key:kafka011SinkTableInfo.getKafkaParamKeys()) {
8183
props.setProperty(key, kafka011SinkTableInfo.getKafkaParam(key));
8284
}
@@ -97,17 +99,14 @@ public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
9799

98100
//this.serializationSchema = Optional.of(JsonRowSerializationSchema.class);
99101
if ("json".equalsIgnoreCase(kafka011SinkTableInfo.getSinkDataType())) {
100-
this.serializationSchema = new JsonRowSerializationSchema(getOutputType());
101-
} else if ("csv".equalsIgnoreCase(kafka011SinkTableInfo.getSinkDataType())){
102-
this.serializationSchema = new TypeInformationSerializationSchema(TypeInformation.of(Row.class),
103-
new CustomerCsvSerialization(kafka011SinkTableInfo.getFieldDelimiter(),fieldTypes));
102+
this.serializationSchema = new CustomerJsonRowSerializationSchema(getOutputType());
104103
}
105104
return this;
106105
}
107106

108107
@Override
109108
public void emitDataStream(DataStream<Row> dataStream) {
110-
KafkaTableSinkBase kafkaTableSink = new Kafka011TableSink(
109+
KafkaTableSinkBase kafkaTableSink = new CustomerKafka11JsonTableSink(
111110
schema,
112111
topic,
113112
properties,

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkParser.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,24 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
3939
kafka11SinkTableInfo.setName(tableName);
4040
parseFieldsInfo(fieldsInfo, kafka11SinkTableInfo);
4141
kafka11SinkTableInfo.setParallelism(MathUtil.getIntegerVal(props.get(KafkaSinkTableInfo.PARALLELISM_KEY.toLowerCase())));
42+
4243
if (props.get(KafkaSinkTableInfo.SINK_DATA_TYPE) != null) {
4344
kafka11SinkTableInfo.setSinkDataType(props.get(KafkaSinkTableInfo.SINK_DATA_TYPE).toString());
4445
}
45-
if (props.get(KafkaSinkTableInfo.FIELD_DELINITER) != null) {
46-
kafka11SinkTableInfo.setFieldDelimiter(props.get(KafkaSinkTableInfo.FIELD_DELINITER).toString());
47-
}
4846

49-
for (String key:props.keySet()) {
47+
// if (props.get(KafkaSinkTableInfo.FIELD_DELINITER) != null) {
48+
// kafka11SinkTableInfo.setFieldDelimiter(props.get(KafkaSinkTableInfo.FIELD_DELINITER).toString());
49+
// }
50+
51+
kafka11SinkTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSinkTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())));
52+
kafka11SinkTableInfo.setTopic(MathUtil.getString(props.get(KafkaSinkTableInfo.TOPIC_KEY.toLowerCase())));
53+
for (String key : props.keySet()) {
5054
if (!key.isEmpty() && key.startsWith("kafka.")) {
5155
kafka11SinkTableInfo.addKafkaParam(key.substring(6), props.get(key).toString());
5256
}
5357
}
58+
kafka11SinkTableInfo.check();
59+
5460
return kafka11SinkTableInfo;
5561
}
5662
}

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkTableInfo.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,17 @@ public class KafkaSinkTableInfo extends TargetTableInfo {
4040
public KafkaSinkTableInfo(){
4141
super.setType(CURR_TYPE);
4242
}
43+
public static final String BOOTSTRAPSERVERS_KEY = "bootstrapServers";
44+
45+
public static final String TOPIC_KEY = "topic";
46+
47+
private String bootstrapServers;
4348

4449
public Map<String,String> kafkaParam = new HashMap<String,String>();
4550

51+
private String topic;
52+
53+
4654
public void addKafkaParam(String key,String value){
4755
kafkaParam.put(key,value);
4856
}
@@ -55,10 +63,28 @@ public Set<String> getKafkaParamKeys(){
5563
return kafkaParam.keySet();
5664
}
5765

66+
67+
public String getBootstrapServers() {
68+
return bootstrapServers;
69+
}
70+
71+
public void setBootstrapServers(String bootstrapServers) {
72+
this.bootstrapServers = bootstrapServers;
73+
}
74+
75+
public String getTopic() {
76+
return topic;
77+
}
78+
79+
public void setTopic(String topic) {
80+
this.topic = topic;
81+
}
82+
83+
5884
@Override
5985
public boolean check() {
60-
Preconditions.checkNotNull(kafkaParam.get("bootstrap.servers"), "kafka of bootstrapServers is required");
61-
Preconditions.checkNotNull(kafkaParam.get("topic"), "kafka of topic is required");
86+
Preconditions.checkNotNull(bootstrapServers, "kafka of bootstrapServers is required");
87+
Preconditions.checkNotNull(topic, "kafka of topic is required");
6288
//Preconditions.checkNotNull(kafkaParam.get("groupId"), "kafka of groupId is required");
6389
return false;
6490
}

kafka11/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
<modules>
1616
<module>kafka11-source</module>
17-
<!--<module>kafka11-sink</module>-->
17+
<module>kafka11-sink</module>
1818
</modules>
1919

2020
<dependencies>

0 commit comments

Comments
 (0)